|
|
|
@@ -116,7 +116,7 @@ func (kl *Kubelet) RunKubelet(config_path, manifest_url, etcd_servers, address,
|
|
|
|
|
glog.Infof("Watching for HTTP configs at %s", manifest_url)
|
|
|
|
|
go util.Forever(func() {
|
|
|
|
|
if err := kl.extractFromHTTP(manifest_url, updateChannel); err != nil {
|
|
|
|
|
glog.Errorf("Error syncing http: %#v", err)
|
|
|
|
|
glog.Errorf("Error syncing http: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}, kl.HTTPCheckFrequency)
|
|
|
|
|
}
|
|
|
|
@@ -166,7 +166,7 @@ func (kl *Kubelet) LogEvent(event *api.Event) error {
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error writing event: %s\n", err)
|
|
|
|
|
if response != nil {
|
|
|
|
|
glog.Infof("Response was: %#v\n", *response)
|
|
|
|
|
glog.Infof("Response was: %v\n", *response)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
@@ -427,21 +427,21 @@ func (kl *Kubelet) WatchFiles(config_path string, updateChannel chan<- manifestU
|
|
|
|
|
statInfo, err := os.Stat(config_path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if !os.IsNotExist(err) {
|
|
|
|
|
glog.Errorf("Error accessing path: %#v", err)
|
|
|
|
|
glog.Errorf("Error accessing path: %v", err)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if statInfo.Mode().IsDir() {
|
|
|
|
|
manifests, err := kl.extractFromDir(config_path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error polling dir: %#v", err)
|
|
|
|
|
glog.Errorf("Error polling dir: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
updateChannel <- manifestUpdate{fileSource, manifests}
|
|
|
|
|
} else if statInfo.Mode().IsRegular() {
|
|
|
|
|
manifest, err := kl.extractFromFile(config_path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error polling file: %#v", err)
|
|
|
|
|
glog.Errorf("Error polling file: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
|
|
|
|
@@ -498,7 +498,7 @@ func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpda
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("%v: received '%v', but couldn't parse as a "+
|
|
|
|
|
"single manifest (%v: %#v) or as multiple manifests (%v: %#v).\n",
|
|
|
|
|
"single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n",
|
|
|
|
|
url, string(data), singleErr, manifest, multiErr, manifests)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -506,7 +506,7 @@ func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpda
|
|
|
|
|
// Return a list of containers, or an error if one occurs.
|
|
|
|
|
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
|
|
|
|
|
if response.Node == nil || len(response.Node.Value) == 0 {
|
|
|
|
|
return nil, fmt.Errorf("no nodes field: %#v", response)
|
|
|
|
|
return nil, fmt.Errorf("no nodes field: %v", response)
|
|
|
|
|
}
|
|
|
|
|
var manifests []api.ContainerManifest
|
|
|
|
|
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
|
|
|
|
@@ -519,12 +519,12 @@ func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- mani
|
|
|
|
|
if util.IsEtcdNotFound(err) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
glog.Errorf("Error on etcd get of %s: %#v", key, err)
|
|
|
|
|
glog.Errorf("Error on etcd get of %s: %v", key, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
manifests, err := kl.ResponseToManifests(response)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error parsing response (%#v): %s", response, err)
|
|
|
|
|
glog.Errorf("Error parsing response (%v): %s", response, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
glog.Infof("Got state from etcd: %+v", manifests)
|
|
|
|
@@ -585,7 +585,7 @@ func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
|
|
|
|
|
func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) {
|
|
|
|
|
var manifests []api.ContainerManifest
|
|
|
|
|
if response.Node == nil || len(response.Node.Value) == 0 {
|
|
|
|
|
return manifests, fmt.Errorf("no nodes field: %#v", response)
|
|
|
|
|
return manifests, fmt.Errorf("no nodes field: %v", response)
|
|
|
|
|
}
|
|
|
|
|
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
|
|
|
|
|
return manifests, err
|
|
|
|
@@ -601,13 +601,13 @@ func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel c
|
|
|
|
|
if watchResponse == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
glog.Infof("Got etcd change: %#v", watchResponse)
|
|
|
|
|
glog.Infof("Got etcd change: %v", watchResponse)
|
|
|
|
|
manifests, err := kl.extractFromEtcd(watchResponse)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error handling response from etcd: %#v", err)
|
|
|
|
|
glog.Errorf("Error handling response from etcd: %v", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
glog.Infof("manifests: %#v", manifests)
|
|
|
|
|
glog.Infof("manifests: %+v", manifests)
|
|
|
|
|
// Ok, we have a valid configuration, send to channel for
|
|
|
|
|
// rejiggering.
|
|
|
|
|
updateChannel <- manifestUpdate{etcdSource, manifests}
|
|
|
|
@@ -641,7 +641,7 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
|
|
|
|
|
|
|
|
|
|
// Sync the configured list of containers (desired state) with the host current state
|
|
|
|
|
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|
|
|
|
glog.Infof("Desired: %#v", config)
|
|
|
|
|
glog.Infof("Desired: %+v", config)
|
|
|
|
|
var err error
|
|
|
|
|
dockerIdsToKeep := map[DockerId]bool{}
|
|
|
|
|
|
|
|
|
@@ -650,14 +650,14 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|
|
|
|
// Make sure we have a network container
|
|
|
|
|
netId, err := kl.getNetworkContainerId(&manifest)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Failed to introspect network container. (%#v) Skipping container %s", err, manifest.Id)
|
|
|
|
|
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.Id)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if netId == "" {
|
|
|
|
|
glog.Infof("Network container doesn't exist, creating")
|
|
|
|
|
netId, err = kl.createNetworkContainer(&manifest)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Failed to create network container: %#v Skipping container %s", err, manifest.Id)
|
|
|
|
|
glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.Id)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -666,24 +666,24 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|
|
|
|
for _, container := range manifest.Containers {
|
|
|
|
|
containerId, err := kl.getContainerId(&manifest, &container)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error detecting container: %#v skipping.", err)
|
|
|
|
|
glog.Errorf("Error detecting container: %v skipping.", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if containerId == "" {
|
|
|
|
|
glog.Infof("%#v doesn't exist, creating", container)
|
|
|
|
|
glog.Infof("%+v doesn't exist, creating", container)
|
|
|
|
|
kl.DockerPuller.Pull(container.Image)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error pulling container: %#v", err)
|
|
|
|
|
glog.Errorf("Error pulling container: %v", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
containerId, err = kl.runContainer(&manifest, &container, "container:"+string(netId))
|
|
|
|
|
if err != nil {
|
|
|
|
|
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
|
|
|
|
glog.Errorf("Error creating container: %#v", err)
|
|
|
|
|
glog.Errorf("Error creating container: %v", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
glog.V(1).Infof("%#v exists as %v", container.Name, containerId)
|
|
|
|
|
glog.V(1).Infof("%s exists as %v", container.Name, containerId)
|
|
|
|
|
}
|
|
|
|
|
dockerIdsToKeep[containerId] = true
|
|
|
|
|
}
|
|
|
|
@@ -692,7 +692,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|
|
|
|
// Kill any containers we don't need
|
|
|
|
|
existingContainers, err := kl.getDockerContainers()
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error listing containers: %#v", err)
|
|
|
|
|
glog.Errorf("Error listing containers: %v", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
for id, container := range existingContainers {
|
|
|
|
@@ -700,7 +700,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|
|
|
|
glog.Infof("Killing: %s", id)
|
|
|
|
|
err = kl.killContainer(container)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Error killing container: %#v", err)
|
|
|
|
|
glog.Errorf("Error killing container: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -718,7 +718,7 @@ func (kl *Kubelet) RunSyncLoop(updateChannel <-chan manifestUpdate, handler Sync
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case u := <-updateChannel:
|
|
|
|
|
glog.Infof("Got configuration from %s: %#v", u.source, u.manifests)
|
|
|
|
|
glog.Infof("Got configuration from %s: %+v", u.source, u.manifests)
|
|
|
|
|
last[u.source] = u.manifests
|
|
|
|
|
case <-time.After(kl.SyncFrequency):
|
|
|
|
|
}
|
|
|
|
@@ -730,7 +730,7 @@ func (kl *Kubelet) RunSyncLoop(updateChannel <-chan manifestUpdate, handler Sync
|
|
|
|
|
|
|
|
|
|
err := handler.SyncManifests(manifests)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("Couldn't sync containers : %#v", err)
|
|
|
|
|
glog.Errorf("Couldn't sync containers : %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|