From 7999983311ddfb44399215f13c970075c95f87ec Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Mon, 30 Jun 2014 22:27:56 -0700 Subject: [PATCH] Added async behavior. --- pkg/kubelet/kubelet.go | 98 ++++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f2c54e2e3b0..4b5cf472671 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -639,55 +639,77 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock return kl.runContainer(manifest, container, "") } +func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerIdsToKeep *map[DockerId]bool, mapLock *sync.Mutex) error { + // Make sure we have a network container + netId, err := kl.getNetworkContainerId(manifest) + if err != nil { + glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.Id) + return err + } + if netId == "" { + glog.Infof("Network container doesn't exist, creating") + netId, err = kl.createNetworkContainer(manifest) + if err != nil { + glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID) + return err + } + } + { + mapLock.Lock() + defer mapLock.Unlock() + (*dockerIdsToKeep)[netId] = true + } + for _, container := range manifest.Containers { + containerId, err := kl.getContainerId(manifest, &container) + if err != nil { + glog.Errorf("Error finding container: %v skipping.", err) + continue + } + if containerId == "" { + glog.Infof("%+v doesn't exist, creating", container) + kl.DockerPuller.Pull(container.Image) + if err != nil { + glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.ID) + continue + } + containerId, err = kl.runContainer(manifest, &container, "container:"+string(netId)) + if err != nil { + // TODO(bburns) : Perhaps blacklist a container after N failures? + glog.Errorf("Error running container: %v skipping.", err) + continue + } + } else { + glog.V(1).Infof("%s exists as %v", container.Name, containerId) + } + { + mapLock.Lock() + defer mapLock.Unlock() + (*dockerIdsToKeep)[containerId] = true + } + } + return nil +} + // Sync the configured list of containers (desired state) with the host current state func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { glog.Infof("Desired: %+v", config) var err error dockerIdsToKeep := map[DockerId]bool{} + mapLock := sync.Mutex{} + waitGroup := sync.WaitGroup{} // Check for any containers that need starting for _, manifest := range config { - // Make sure we have a network container - netId, err := kl.getNetworkContainerId(&manifest) - if err != nil { - glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID) - continue - } - if netId == "" { - glog.Infof("Network container doesn't exist, creating") - netId, err = kl.createNetworkContainer(&manifest) + waitGroup.Add(1) + go func() { + err := kl.syncManifest(&manifest, &dockerIdsToKeep, &mapLock) if err != nil { - glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.ID) - continue + glog.Errorf("Error syncing manifest: %v skipping.", err) } - } - dockerIdsToKeep[netId] = true - - for _, container := range manifest.Containers { - containerId, err := kl.getContainerId(&manifest, &container) - if err != nil { - glog.Errorf("Error detecting container: %v skipping.", err) - continue - } - if containerId == "" { - glog.Infof("%+v doesn't exist, creating", container) - kl.DockerPuller.Pull(container.Image) - if err != nil { - glog.Errorf("Error pulling container: %v", err) - continue - } - containerId, err = kl.runContainer(&manifest, &container, "container:"+string(netId)) - if err != nil { - // TODO(bburns) : Perhaps blacklist a container after N failures? - glog.Errorf("Error creating container: %v", err) - continue - } - } else { - glog.V(1).Infof("%s exists as %v", container.Name, containerId) - } - dockerIdsToKeep[containerId] = true - } + waitGroup.Done() + }() } + waitGroup.Wait() // Kill any containers we don't need existingContainers, err := kl.getDockerContainers()