Merge pull request #320 from brendandburns/async

Make each pod synchronization in the kubelet an independent thread.
This commit is contained in:
Daniel Smith 2014-07-01 12:42:33 -07:00
commit a10ac51224

View File

@ -639,54 +639,77 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
return kl.runContainer(manifest, container, "")
}
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerId) error {
// Make sure we have a network container
netId, err := kl.getNetworkContainerId(manifest)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID)
return err
}
if netId == "" {
glog.Infof("Network container doesn't exist, creating")
netId, err = kl.createNetworkContainer(manifest)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID)
return err
}
}
keepChannel <- netId
for _, container := range manifest.Containers {
containerId, err := kl.getContainerId(manifest, &container)
if err != nil {
glog.Errorf("Error finding container: %v skipping id %s.", err, manifest.ID)
continue
}
if containerId == "" {
glog.Infof("%+v doesn't exist, creating", container)
kl.DockerPuller.Pull(container.Image)
if err != nil {
glog.Errorf("Failed to create container: %v Skipping container %s", err, manifest.ID)
continue
}
containerId, err = kl.runContainer(manifest, &container, "container:"+string(netId))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running container: %v skipping.", err)
continue
}
} else {
glog.V(1).Infof("%s exists as %v", container.Name, containerId)
}
keepChannel <- containerId
}
return nil
}
// Sync the configured list of containers (desired state) with the host current state
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
glog.Infof("Desired: %+v", config)
var err error
dockerIdsToKeep := map[DockerId]bool{}
keepChannel := make(chan DockerId)
waitGroup := sync.WaitGroup{}
// Check for any containers that need starting
for _, manifest := range config {
// Make sure we have a network container
netId, err := kl.getNetworkContainerId(&manifest)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID)
continue
}
if netId == "" {
glog.Infof("Network container doesn't exist, creating")
netId, err = kl.createNetworkContainer(&manifest)
waitGroup.Add(1)
go func() {
defer util.HandleCrash()
defer waitGroup.Done()
err := kl.syncManifest(&manifest, keepChannel)
if err != nil {
glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.ID)
continue
glog.Errorf("Error syncing manifest: %v skipping.", err)
}
}()
}
go func() {
for id := range keepChannel {
dockerIdsToKeep[id] = true
}
dockerIdsToKeep[netId] = true
for _, container := range manifest.Containers {
containerId, err := kl.getContainerId(&manifest, &container)
if err != nil {
glog.Errorf("Error detecting container: %v skipping.", err)
continue
}
if containerId == "" {
glog.Infof("%+v doesn't exist, creating", container)
kl.DockerPuller.Pull(container.Image)
if err != nil {
glog.Errorf("Error pulling container: %v", err)
continue
}
containerId, err = kl.runContainer(&manifest, &container, "container:"+string(netId))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error creating container: %v", err)
continue
}
} else {
glog.V(1).Infof("%s exists as %v", container.Name, containerId)
}
dockerIdsToKeep[containerId] = true
}
}()
if len(config) > 0 {
waitGroup.Wait()
close(keepChannel)
}
// Kill any containers we don't need