mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Added async behavior.
This commit is contained in:
parent
d75bd790d3
commit
7999983311
@ -639,55 +639,77 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
|
|||||||
return kl.runContainer(manifest, container, "")
|
return kl.runContainer(manifest, container, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerIdsToKeep *map[DockerId]bool, mapLock *sync.Mutex) error {
|
||||||
|
// Make sure we have a network container
|
||||||
|
netId, err := kl.getNetworkContainerId(manifest)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.Id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if netId == "" {
|
||||||
|
glog.Infof("Network container doesn't exist, creating")
|
||||||
|
netId, err = kl.createNetworkContainer(manifest)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
mapLock.Lock()
|
||||||
|
defer mapLock.Unlock()
|
||||||
|
(*dockerIdsToKeep)[netId] = true
|
||||||
|
}
|
||||||
|
for _, container := range manifest.Containers {
|
||||||
|
containerId, err := kl.getContainerId(manifest, &container)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error finding container: %v skipping.", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if containerId == "" {
|
||||||
|
glog.Infof("%+v doesn't exist, creating", container)
|
||||||
|
kl.DockerPuller.Pull(container.Image)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
containerId, err = kl.runContainer(manifest, &container, "container:"+string(netId))
|
||||||
|
if err != nil {
|
||||||
|
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
||||||
|
glog.Errorf("Error running container: %v skipping.", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(1).Infof("%s exists as %v", container.Name, containerId)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
mapLock.Lock()
|
||||||
|
defer mapLock.Unlock()
|
||||||
|
(*dockerIdsToKeep)[containerId] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Sync the configured list of containers (desired state) with the host current state
|
// Sync the configured list of containers (desired state) with the host current state
|
||||||
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
||||||
glog.Infof("Desired: %+v", config)
|
glog.Infof("Desired: %+v", config)
|
||||||
var err error
|
var err error
|
||||||
dockerIdsToKeep := map[DockerId]bool{}
|
dockerIdsToKeep := map[DockerId]bool{}
|
||||||
|
mapLock := sync.Mutex{}
|
||||||
|
waitGroup := sync.WaitGroup{}
|
||||||
|
|
||||||
// Check for any containers that need starting
|
// Check for any containers that need starting
|
||||||
for _, manifest := range config {
|
for _, manifest := range config {
|
||||||
// Make sure we have a network container
|
waitGroup.Add(1)
|
||||||
netId, err := kl.getNetworkContainerId(&manifest)
|
go func() {
|
||||||
if err != nil {
|
err := kl.syncManifest(&manifest, &dockerIdsToKeep, &mapLock)
|
||||||
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if netId == "" {
|
|
||||||
glog.Infof("Network container doesn't exist, creating")
|
|
||||||
netId, err = kl.createNetworkContainer(&manifest)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.ID)
|
glog.Errorf("Error syncing manifest: %v skipping.", err)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
waitGroup.Done()
|
||||||
dockerIdsToKeep[netId] = true
|
}()
|
||||||
|
|
||||||
for _, container := range manifest.Containers {
|
|
||||||
containerId, err := kl.getContainerId(&manifest, &container)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error detecting container: %v skipping.", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if containerId == "" {
|
|
||||||
glog.Infof("%+v doesn't exist, creating", container)
|
|
||||||
kl.DockerPuller.Pull(container.Image)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error pulling container: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
containerId, err = kl.runContainer(&manifest, &container, "container:"+string(netId))
|
|
||||||
if err != nil {
|
|
||||||
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
|
||||||
glog.Errorf("Error creating container: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.V(1).Infof("%s exists as %v", container.Name, containerId)
|
|
||||||
}
|
|
||||||
dockerIdsToKeep[containerId] = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
waitGroup.Wait()
|
||||||
|
|
||||||
// Kill any containers we don't need
|
// Kill any containers we don't need
|
||||||
existingContainers, err := kl.getDockerContainers()
|
existingContainers, err := kl.getDockerContainers()
|
||||||
|
Loading…
Reference in New Issue
Block a user