diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f2c54e2e3b0..7e2430fdbef 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -639,54 +639,77 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock return kl.runContainer(manifest, container, "") } +func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerId) error { + // Make sure we have a network container + netId, err := kl.getNetworkContainerId(manifest) + if err != nil { + glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID) + return err + } + if netId == "" { + glog.Infof("Network container doesn't exist, creating") + netId, err = kl.createNetworkContainer(manifest) + if err != nil { + glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID) + return err + } + } + keepChannel <- netId + for _, container := range manifest.Containers { + containerId, err := kl.getContainerId(manifest, &container) + if err != nil { + glog.Errorf("Error finding container: %v skipping id %s.", err, manifest.ID) + continue + } + if containerId == "" { + glog.Infof("%+v doesn't exist, creating", container) + kl.DockerPuller.Pull(container.Image) + if err != nil { + glog.Errorf("Failed to create container: %v Skipping container %s", err, manifest.ID) + continue + } + containerId, err = kl.runContainer(manifest, &container, "container:"+string(netId)) + if err != nil { + // TODO(bburns) : Perhaps blacklist a container after N failures? + glog.Errorf("Error running container: %v skipping.", err) + continue + } + } else { + glog.V(1).Infof("%s exists as %v", container.Name, containerId) + } + keepChannel <- containerId + } + return nil +} + // Sync the configured list of containers (desired state) with the host current state func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { glog.Infof("Desired: %+v", config) var err error dockerIdsToKeep := map[DockerId]bool{} + keepChannel := make(chan DockerId) + waitGroup := sync.WaitGroup{} // Check for any containers that need starting for _, manifest := range config { - // Make sure we have a network container - netId, err := kl.getNetworkContainerId(&manifest) - if err != nil { - glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID) - continue - } - if netId == "" { - glog.Infof("Network container doesn't exist, creating") - netId, err = kl.createNetworkContainer(&manifest) + waitGroup.Add(1) + go func() { + defer util.HandleCrash() + defer waitGroup.Done() + err := kl.syncManifest(&manifest, keepChannel) if err != nil { - glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.ID) - continue + glog.Errorf("Error syncing manifest: %v skipping.", err) } + }() + } + go func() { + for id := range keepChannel { + dockerIdsToKeep[id] = true } - dockerIdsToKeep[netId] = true - - for _, container := range manifest.Containers { - containerId, err := kl.getContainerId(&manifest, &container) - if err != nil { - glog.Errorf("Error detecting container: %v skipping.", err) - continue - } - if containerId == "" { - glog.Infof("%+v doesn't exist, creating", container) - kl.DockerPuller.Pull(container.Image) - if err != nil { - glog.Errorf("Error pulling container: %v", err) - continue - } - containerId, err = kl.runContainer(&manifest, &container, "container:"+string(netId)) - if err != nil { - // TODO(bburns) : Perhaps blacklist a container after N failures? - glog.Errorf("Error creating container: %v", err) - continue - } - } else { - glog.V(1).Infof("%s exists as %v", container.Name, containerId) - } - dockerIdsToKeep[containerId] = true - } + }() + if len(config) > 0 { + waitGroup.Wait() + close(keepChannel) } // Kill any containers we don't need