Merge pull request #588 from vmarmol/simple-parallel

Sync pods asynchronously in the Kubelet.
This commit is contained in:
Daniel Smith 2014-07-24 14:00:43 -07:00
commit c979900ce2
3 changed files with 162 additions and 59 deletions

View File

@ -81,22 +81,22 @@ func (p dockerPuller) Pull(image string) error {
// DockerContainers is a map of containers // DockerContainers is a map of containers
type DockerContainers map[DockerID]*docker.APIContainers type DockerContainers map[DockerID]*docker.APIContainers
func (c DockerContainers) FindPodContainer(manifestID, containerName string) (*docker.APIContainers, bool) { func (c DockerContainers) FindPodContainer(podFullName, containerName string) (*docker.APIContainers, bool) {
for _, dockerContainer := range c { for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == manifestID && dockerContainerName == containerName { if dockerManifestID == podFullName && dockerContainerName == containerName {
return dockerContainer, true return dockerContainer, true
} }
} }
return nil, false return nil, false
} }
func (c DockerContainers) FindContainersByPodFullName(manifestID string) map[string]*docker.APIContainers { func (c DockerContainers) FindContainersByPodFullName(podFullName string) map[string]*docker.APIContainers {
containers := make(map[string]*docker.APIContainers) containers := make(map[string]*docker.APIContainers)
for _, dockerContainer := range c { for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == manifestID { if dockerManifestID == podFullName {
containers[dockerContainerName] = dockerContainer containers[dockerContainerName] = dockerContainer
} }
} }
@ -126,7 +126,7 @@ func getKubeletDockerContainers(client DockerInterface) (DockerContainers, error
var ErrNoContainersInPod = errors.New("no containers exist for this pod") var ErrNoContainersInPod = errors.New("no containers exist for this pod")
// GetDockerPodInfo returns docker info for all containers in the pod/manifest. // GetDockerPodInfo returns docker info for all containers in the pod/manifest.
func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, error) { func getDockerPodInfo(client DockerInterface, podFullName string) (api.PodInfo, error) {
info := api.PodInfo{} info := api.PodInfo{}
containers, err := client.ListContainers(docker.ListContainersOptions{}) containers, err := client.ListContainers(docker.ListContainersOptions{})
@ -136,7 +136,7 @@ func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, e
for _, value := range containers { for _, value := range containers {
dockerManifestID, dockerContainerName := parseDockerName(value.Names[0]) dockerManifestID, dockerContainerName := parseDockerName(value.Names[0])
if dockerManifestID != manifestID { if dockerManifestID != podFullName {
continue continue
} }
inspectResult, err := client.InspectContainer(value.ID) inspectResult, err := client.InspectContainer(value.ID)
@ -173,13 +173,13 @@ func unescapeDash(in string) (out string) {
const containerNamePrefix = "k8s" const containerNamePrefix = "k8s"
// Creates a name which can be reversed to identify both manifest id and container name. // Creates a name which can be reversed to identify both full pod name and container name.
func buildDockerName(pod *Pod, container *api.Container) string { func buildDockerName(pod *Pod, container *api.Container) string {
// Note, manifest.ID could be blank. // Note, manifest.ID could be blank.
return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32()) return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32())
} }
// Upacks a container name, returning the manifest id and container name we would have used to // Upacks a container name, returning the pod full name and container name we would have used to
// construct the docker name. If the docker name isn't one we created, we may return empty strings. // construct the docker name. If the docker name isn't one we created, we may return empty strings.
func parseDockerName(name string) (podFullName, containerName string) { func parseDockerName(name string) (podFullName, containerName string) {
// For some reason docker appears to be appending '/' to names. // For some reason docker appears to be appending '/' to names.

View File

@ -70,6 +70,7 @@ func NewMainKubelet(
cadvisorClient: cc, cadvisorClient: cc,
etcdClient: ec, etcdClient: ec,
rootDirectory: rd, rootDirectory: rd,
podWorkers: newPodWorkers(),
} }
} }
@ -80,6 +81,7 @@ func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet {
hostname: hn, hostname: hn,
dockerClient: dc, dockerClient: dc,
dockerPuller: &FakeDockerPuller{}, dockerPuller: &FakeDockerPuller{},
podWorkers: newPodWorkers(),
} }
} }
@ -88,6 +90,7 @@ type Kubelet struct {
hostname string hostname string
dockerClient DockerInterface dockerClient DockerInterface
rootDirectory string rootDirectory string
podWorkers podWorkers
// Optional, no events will be sent without it // Optional, no events will be sent without it
etcdClient tools.EtcdClient etcdClient tools.EtcdClient
@ -115,6 +118,43 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl) kl.syncLoop(updates, kl)
} }
// Per-pod workers.
type podWorkers struct {
lock sync.Mutex
// Set of pods with existing workers.
workers util.StringSet
}
func newPodWorkers() podWorkers {
return podWorkers{
workers: util.NewStringSet(),
}
}
// Runs a worker for "podFullName" asynchronously with the specified "action".
// If the worker for the "podFullName" is already running, functions as a no-op.
func (self *podWorkers) Run(podFullName string, action func()) {
self.lock.Lock()
defer self.lock.Unlock()
// This worker is already running, let it finish.
if self.workers.Has(podFullName) {
return
}
self.workers.Insert(podFullName)
// Run worker async.
go func() {
defer util.HandleCrash()
action()
self.lock.Lock()
defer self.lock.Unlock()
self.workers.Delete(podFullName)
}()
}
// LogEvent logs an event to the etcd backend. // LogEvent logs an event to the etcd backend.
func (kl *Kubelet) LogEvent(event *api.Event) error { func (kl *Kubelet) LogEvent(event *api.Event) error {
if kl.etcdClient == nil { if kl.etcdClient == nil {
@ -262,6 +302,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
// Kill a docker container // Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error { func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error {
glog.Infof("Killing: %s", dockerContainer.ID)
err := kl.dockerClient.StopContainer(dockerContainer.ID, 10) err := kl.dockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName := parseDockerName(dockerContainer.Names[0]) podFullName, containerName := parseDockerName(dockerContainer.Names[0])
kl.LogEvent(&api.Event{ kl.LogEvent(&api.Event{
@ -300,9 +341,14 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) {
return kl.runContainer(pod, container, nil, "") return kl.runContainer(pod, container, nil, "")
} }
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChannel chan<- DockerID) error { type empty struct{}
podFullName := GetPodFullName(pod)
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
podFullName := GetPodFullName(pod)
containersToKeep := make(map[DockerID]empty)
killedContainers := make(map[DockerID]empty)
// Make sure we have a network container
var netID DockerID var netID DockerID
if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found { if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found {
netID = DockerID(networkDockerContainer.ID) netID = DockerID(networkDockerContainer.ID)
@ -315,7 +361,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
} }
netID = dockerNetworkID netID = dockerNetworkID
} }
keepChannel <- netID containersToKeep[netID] = empty{}
podVolumes, err := kl.mountExternalVolumes(&pod.Manifest) podVolumes, err := kl.mountExternalVolumes(&pod.Manifest)
if err != nil { if err != nil {
@ -335,7 +381,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
continue continue
} }
if healthy == health.Healthy { if healthy == health.Healthy {
keepChannel <- containerID containersToKeep[containerID] = empty{}
continue continue
} }
@ -344,6 +390,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err) glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue continue
} }
killedContainers[containerID] = empty{}
} }
glog.Infof("Container doesn't exist, creating %#v", container) glog.Infof("Container doesn't exist, creating %#v", container)
@ -357,20 +404,38 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err) glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err)
continue continue
} }
keepChannel <- containerID containersToKeep[containerID] = empty{}
} }
// Kill any containers in this pod which were not identified above (guards against duplicates).
for id, container := range dockerContainers {
curPodFullName, _ := parseDockerName(container.Names[0])
if curPodFullName == podFullName {
// Don't kill containers we want to keep or those we already killed.
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
err = kl.killContainer(*container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
}
}
return nil return nil
} }
type empty struct{} type podContainer struct {
podFullName string
containerName string
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state. // SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []Pod) error { func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.Infof("Desired [%s]: %+v", kl.hostname, pods) glog.Infof("Desired [%s]: %+v", kl.hostname, pods)
var err error var err error
dockerIdsToKeep := map[DockerID]empty{} desiredContainers := make(map[podContainer]empty)
keepChannel := make(chan DockerID, defaultChanSize)
waitGroup := sync.WaitGroup{}
dockerContainers, err := getKubeletDockerContainers(kl.dockerClient) dockerContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil { if err != nil {
@ -380,30 +445,23 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
// Check for any containers that need starting // Check for any containers that need starting
for i := range pods { for i := range pods {
waitGroup.Add(1) pod := &pods[i]
go func(index int) { podFullName := GetPodFullName(pod)
defer util.HandleCrash()
defer waitGroup.Done() // Add all containers (including net) to the map.
// necessary to dereference by index here b/c otherwise the shared value desiredContainers[podContainer{podFullName, networkContainerName}] = empty{}
// in the for each is re-used. for _, cont := range pod.Manifest.Containers {
err := kl.syncPod(&pods[index], dockerContainers, keepChannel) desiredContainers[podContainer{podFullName, cont.Name}] = empty{}
}
// Run the sync in an async manifest worker.
kl.podWorkers.Run(podFullName, func() {
err := kl.syncPod(pod, dockerContainers)
if err != nil { if err != nil {
glog.Errorf("Error syncing pod: %v skipping.", err) glog.Errorf("Error syncing pod: %v skipping.", err)
} }
}(i) })
} }
ch := make(chan bool)
go func() {
for id := range keepChannel {
dockerIdsToKeep[id] = empty{}
}
ch <- true
}()
if len(pods) > 0 {
waitGroup.Wait()
}
close(keepChannel)
<-ch
// Kill any containers we don't need // Kill any containers we don't need
existingContainers, err := getKubeletDockerContainers(kl.dockerClient) existingContainers, err := getKubeletDockerContainers(kl.dockerClient)
@ -411,9 +469,10 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.Errorf("Error listing containers: %v", err) glog.Errorf("Error listing containers: %v", err)
return err return err
} }
for id, container := range existingContainers { for _, container := range existingContainers {
if _, ok := dockerIdsToKeep[id]; !ok { // Don't kill containers that are in the desired pods.
glog.Infof("Killing: %s", id) podFullName, containerName := parseDockerName(container.Names[0])
if _, ok := desiredContainers[podContainer{podFullName, containerName}]; !ok {
err = kl.killContainer(*container) err = kl.killContainer(*container)
if err != nil { if err != nil {
glog.Errorf("Error killing container: %v", err) glog.Errorf("Error killing container: %v", err)

View File

@ -62,6 +62,7 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker
kubelet.dockerPuller = &FakeDockerPuller{} kubelet.dockerPuller = &FakeDockerPuller{}
kubelet.etcdClient = fakeEtcdClient kubelet.etcdClient = fakeEtcdClient
kubelet.rootDirectory = "/tmp/kubelet" kubelet.rootDirectory = "/tmp/kubelet"
kubelet.podWorkers = newPodWorkers()
return kubelet, fakeEtcdClient, fakeDocker return kubelet, fakeEtcdClient, fakeDocker
} }
@ -269,7 +270,7 @@ func TestSyncPodsDeletes(t *testing.T) {
expectNoError(t, err) expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"}) verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"})
// A map interation is used to delete containers, so must not depend on // A map iteration is used to delete containers, so must not depend on
// order here. // order here.
expectedToStop := map[string]bool{ expectedToStop := map[string]bool{
"1234": true, "1234": true,
@ -282,29 +283,71 @@ func TestSyncPodsDeletes(t *testing.T) {
} }
} }
func TestSyncPodDeletesDuplicate(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
dockerContainers := DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--foo--bar.test--1"},
ID: "1234",
},
"9876": &docker.APIContainers{
// network container
Names: []string{"/k8s--net--bar.test--"},
ID: "9876",
},
"4567": &docker.APIContainers{
// Duplicate for the same container.
Names: []string{"/k8s--foo--bar.test--2"},
ID: "4567",
},
"2304": &docker.APIContainers{
// Container for another pod, untouched.
Names: []string{"/k8s--baz--fiz.test--6"},
ID: "2304",
},
}
err := kubelet.syncPod(&Pod{
Name: "bar",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "bar",
Containers: []api.Container{
{Name: "foo"},
},
},
}, dockerContainers)
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"stop"})
// Expect one of the duplicates to be killed.
if len(fakeDocker.stopped) != 1 || (len(fakeDocker.stopped) != 0 && fakeDocker.stopped[0] != "1234" && fakeDocker.stopped[0] != "4567") {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped)
}
}
type FalseHealthChecker struct{} type FalseHealthChecker struct{}
func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) { func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) {
return health.Unhealthy, nil return health.Unhealthy, nil
} }
func TestSyncPodsUnhealthy(t *testing.T) { func TestSyncPodUnhealthy(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t) kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.healthChecker = &FalseHealthChecker{} kubelet.healthChecker = &FalseHealthChecker{}
fakeDocker.containerList = []docker.APIContainers{ dockerContainers := DockerContainers{
{ "1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--bar--foo.test"}, Names: []string{"/k8s--bar--foo.test"},
ID: "1234", ID: "1234",
}, },
{ "9876": &docker.APIContainers{
// network container // network container
Names: []string{"/k8s--net--foo.test--"}, Names: []string{"/k8s--net--foo.test--"},
ID: "9876", ID: "9876",
}, },
} }
err := kubelet.SyncPods([]Pod{ err := kubelet.syncPod(&Pod{
{
Name: "foo", Name: "foo",
Namespace: "test", Namespace: "test",
Manifest: api.ContainerManifest{ Manifest: api.ContainerManifest{
@ -318,9 +361,9 @@ func TestSyncPodsUnhealthy(t *testing.T) {
}, },
}, },
}, },
}}) }, dockerContainers)
expectNoError(t, err) expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list"}) verifyCalls(t, fakeDocker, []string{"stop", "create", "start"})
// A map interation is used to delete containers, so must not depend on // A map interation is used to delete containers, so must not depend on
// order here. // order here.
@ -699,6 +742,7 @@ func TestGetRooInfo(t *testing.T) {
dockerClient: &fakeDocker, dockerClient: &fakeDocker,
dockerPuller: &FakeDockerPuller{}, dockerPuller: &FakeDockerPuller{},
cadvisorClient: mockCadvisor, cadvisorClient: mockCadvisor,
podWorkers: newPodWorkers(),
} }
// If the container name is an empty string, then it means the root container. // If the container name is an empty string, then it means the root container.