diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a8fc638d6f5..8d95fc96901 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -70,26 +70,26 @@ type Kubelet struct { // Starts background goroutines. If file, manifest_url, or address are empty, // they are not watched. Never returns. -func (sl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) { +func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) { fileChannel := make(chan api.ContainerManifest) etcdChannel := make(chan []api.ContainerManifest) httpChannel := make(chan api.ContainerManifest) serverChannel := make(chan api.ContainerManifest) - go util.Forever(func() { sl.WatchFile(file, fileChannel) }, 20*time.Second) + go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) if manifest_url != "" { - go util.Forever(func() { sl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second) + go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second) } if etcd_servers != "" { servers := []string{etcd_servers} log.Printf("Creating etcd client pointing to %v", servers) - sl.Client = etcd.NewClient(servers) - go util.Forever(func() { sl.SyncAndSetupEtcdWatch(etcdChannel) }, 20*time.Second) + kl.Client = etcd.NewClient(servers) + go util.Forever(func() { kl.SyncAndSetupEtcdWatch(etcdChannel) }, 20*time.Second) } if address != "" { log.Printf("Starting to listen on %s:%d", address, port) handler := KubeletServer{ - Kubelet: sl, + Kubelet: kl, UpdateChannel: serverChannel, } s := &http.Server{ @@ -102,7 +102,7 @@ func (sl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, } go util.Forever(func() { s.ListenAndServe() }, 0) } - sl.RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel, sl) + kl.RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel, kl) } // Interface implemented by Kubelet, for testability @@ -111,8 +111,8 @@ type SyncHandler interface { } // Log an event to the etcd backend. -func (sl *Kubelet) LogEvent(event *api.Event) error { - if sl.Client == nil { +func (kl *Kubelet) LogEvent(event *api.Event) error { + if kl.Client == nil { return fmt.Errorf("no etcd client connection.") } event.Timestamp = time.Now().Unix() @@ -122,7 +122,7 @@ func (sl *Kubelet) LogEvent(event *api.Event) error { } var response *etcd.Response - response, err = sl.Client.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */) + response, err = kl.Client.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */) // TODO(bburns) : examine response here. if err != nil { log.Printf("Error writing event: %s\n", err) @@ -135,8 +135,8 @@ func (sl *Kubelet) LogEvent(event *api.Event) error { // Does this container exist on this host? Returns true if so, and the name under which the container is running. // Returns an error if one occurs. -func (sl *Kubelet) ContainerExists(manifest *api.ContainerManifest, container *api.Container) (exists bool, foundName string, err error) { - containers, err := sl.ListContainers() +func (kl *Kubelet) ContainerExists(manifest *api.ContainerManifest, container *api.Container) (exists bool, foundName string, err error) { + containers, err := kl.ListContainers() if err != nil { return false, "", err } @@ -145,15 +145,15 @@ func (sl *Kubelet) ContainerExists(manifest *api.ContainerManifest, container *a if manifestId == manifest.Id && containerName == container.Name { // TODO(bburns) : This leads to an extra list. Convert this to use the returned ID and a straight call // to inspect - data, err := sl.GetContainerByName(name) + data, err := kl.GetContainerByName(name) return data != nil, name, err } } return false, "", nil } -func (sl *Kubelet) GetContainerID(name string) (string, error) { - containerList, err := sl.DockerClient.ListContainers(docker.ListContainersOptions{}) +func (kl *Kubelet) GetContainerID(name string) (string, error) { + containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{}) if err != nil { return "", err } @@ -167,17 +167,17 @@ func (sl *Kubelet) GetContainerID(name string) (string, error) { // Get a container by name. // returns the container data from Docker, or an error if one exists. -func (sl *Kubelet) GetContainerByName(name string) (*docker.Container, error) { - id, err := sl.GetContainerID(name) +func (kl *Kubelet) GetContainerByName(name string) (*docker.Container, error) { + id, err := kl.GetContainerID(name) if err != nil { return nil, err } - return sl.DockerClient.InspectContainer(id) + return kl.DockerClient.InspectContainer(id) } -func (sl *Kubelet) ListContainers() ([]string, error) { +func (kl *Kubelet) ListContainers() ([]string, error) { result := []string{} - containerList, err := sl.DockerClient.ListContainers(docker.ListContainersOptions{}) + containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{}) if err != nil { return result, err } @@ -187,9 +187,9 @@ func (sl *Kubelet) ListContainers() ([]string, error) { return result, err } -func (sl *Kubelet) pullImage(image string) error { - sl.pullLock.Lock() - defer sl.pullLock.Unlock() +func (kl *Kubelet) pullImage(image string) error { + kl.pullLock.Lock() + defer kl.pullLock.Unlock() cmd := exec.Command("docker", "pull", image) err := cmd.Start() if err != nil { @@ -236,8 +236,8 @@ func dockerNameToManifestAndContainer(name string) (manifestId, containerName st return } -func (sl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) { - err = sl.pullImage(container.Image) +func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) { + err = kl.pullImage(container.Image) if err != nil { return "", err } @@ -289,24 +289,24 @@ func (sl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api. Cmd: cmdList, }, } - dockerContainer, err := sl.DockerClient.CreateContainer(opts) + dockerContainer, err := kl.DockerClient.CreateContainer(opts) if err != nil { return "", err } - return name, sl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{ + return name, kl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{ PortBindings: portBindings, Binds: binds, }) } -func (sl *Kubelet) KillContainer(name string) error { - id, err := sl.GetContainerID(name) +func (kl *Kubelet) KillContainer(name string) error { + id, err := kl.GetContainerID(name) if err != nil { return err } - err = sl.DockerClient.StopContainer(id, 10) + err = kl.DockerClient.StopContainer(id, 10) manifestId, containerName := dockerNameToManifestAndContainer(name) - sl.LogEvent(&api.Event{ + kl.LogEvent(&api.Event{ Event: "STOP", Manifest: &api.ContainerManifest{ Id: manifestId, @@ -321,17 +321,17 @@ func (sl *Kubelet) KillContainer(name string) error { // Watch a file for changes to the set of tasks that should run on this Kubelet // This function loops forever and is intended to be run as a goroutine -func (sl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) { +func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) { var lastData []byte for { - time.Sleep(sl.FileCheckFrequency) + time.Sleep(kl.FileCheckFrequency) var manifest api.ContainerManifest data, err := ioutil.ReadFile(file) if err != nil { log.Printf("Couldn't read file: %s : %v", file, err) continue } - if err = sl.ExtractYAMLData(data, &manifest); err != nil { + if err = kl.ExtractYAMLData(data, &manifest); err != nil { continue } if !bytes.Equal(lastData, data) { @@ -346,13 +346,13 @@ func (sl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerMani // Watch an HTTP endpoint for changes to the set of tasks that should run on this Kubelet // This function runs forever and is intended to be run as a goroutine -func (sl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) { +func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) { var lastData []byte client := &http.Client{} for { - time.Sleep(sl.HTTPCheckFrequency) + time.Sleep(kl.HTTPCheckFrequency) var config api.ContainerManifest - data, err := sl.SyncHTTP(client, url, &config) + data, err := kl.SyncHTTP(client, url, &config) log.Printf("Containers: %#v", config) if err != nil { log.Printf("Error syncing HTTP: %#v", err) @@ -369,7 +369,7 @@ func (sl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManif // SyncHTTP reads from url a yaml manifest and populates config. Returns the // raw bytes, if something was read. Returns an error if something goes wrong. // 'client' is used to execute the request, to allow caching of clients. -func (sl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) { request, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err @@ -383,7 +383,7 @@ func (sl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.Contain if err != nil { return nil, err } - if err = sl.ExtractYAMLData(body, &config); err != nil { + if err = kl.ExtractYAMLData(body, &config); err != nil { return body, err } return body, nil @@ -391,17 +391,17 @@ func (sl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.Contain // Take an etcd Response object, and turn it into a structured list of containers // Return a list of containers, or an error if one occurs. -func (sl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) { +func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) { if response.Node == nil || len(response.Node.Value) == 0 { return nil, fmt.Errorf("no nodes field: %#v", response) } var manifests []api.ContainerManifest - err := sl.ExtractYAMLData([]byte(response.Node.Value), &manifests) + err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests) return manifests, err } -func (sl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error { - response, err := sl.Client.Get(key+"/kubelet", true, false) +func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error { + response, err := kl.Client.Get(key+"/kubelet", true, false) if err != nil { log.Printf("Error on get on %s: %#v", key, err) switch err.(type) { @@ -413,7 +413,7 @@ func (sl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []ap } return err } - manifests, err := sl.ResponseToManifests(response) + manifests, err := kl.ResponseToManifests(response) if err != nil { log.Printf("Error parsing response (%#v): %s", response, err) return err @@ -426,7 +426,7 @@ func (sl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []ap // Sync with etcd, and set up an etcd watch for new configurations // The channel to send new configurations across // This function loops forever and is intended to be run in a go routine. -func (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) { +func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) { hostname, err := exec.Command("hostname", "-f").Output() if err != nil { log.Printf("Couldn't determine hostname : %v", err) @@ -435,7 +435,7 @@ func (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerMan key := "/registry/hosts/" + strings.TrimSpace(string(hostname)) // First fetch the initial configuration (watch only gives changes...) for { - err = sl.getKubeletStateFromEtcd(key, changeChannel) + err = kl.getKubeletStateFromEtcd(key, changeChannel) if err == nil { // We got a successful response, etcd is up, set up the watch. break @@ -444,23 +444,23 @@ func (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerMan } done := make(chan bool) - go util.Forever(func() { sl.TimeoutWatch(done) }, 0) + go util.Forever(func() { kl.TimeoutWatch(done) }, 0) for { // The etcd client will close the watch channel when it exits. So we need // to create and service a new one every time. watchChannel := make(chan *etcd.Response) // We don't push this through Forever because if it dies, we just do it again in 30 secs. // anyway. - go sl.WatchEtcd(watchChannel, changeChannel) + go kl.WatchEtcd(watchChannel, changeChannel) - sl.getKubeletStateFromEtcd(key, changeChannel) + kl.getKubeletStateFromEtcd(key, changeChannel) log.Printf("Setting up a watch for configuration changes in etcd for %s", key) - sl.Client.Watch(key, 0, true, watchChannel, done) + kl.Client.Watch(key, 0, true, watchChannel, done) } } // Timeout the watch after 30 seconds -func (sl *Kubelet) TimeoutWatch(done chan bool) { +func (kl *Kubelet) TimeoutWatch(done chan bool) { t := time.Tick(30 * time.Second) for _ = range t { done <- true @@ -468,7 +468,7 @@ func (sl *Kubelet) TimeoutWatch(done chan bool) { } // Extract data from YAML file into a list of containers. -func (sl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error { +func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error { err := yaml.Unmarshal(buf, output) if err != nil { log.Printf("Couldn't unmarshal configuration: %v", err) @@ -479,7 +479,7 @@ func (sl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error { // Watch etcd for changes, receives config objects from the etcd client watch. // This function loops forever and is intended to be run as a goroutine. -func (sl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) { +func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) { defer util.HandleCrash() for { watchResponse := <-watchChannel @@ -498,7 +498,7 @@ func (sl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel c } log.Printf("Got data: %v", watchResponse.Node.Value) var manifests []api.ContainerManifest - if err := sl.ExtractYAMLData([]byte(watchResponse.Node.Value), &manifests); err != nil { + if err := kl.ExtractYAMLData([]byte(watchResponse.Node.Value), &manifests); err != nil { continue } // Ok, we have a valid configuration, send to channel for @@ -508,21 +508,21 @@ func (sl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel c } // Sync the configured list of containers (desired state) with the host current state -func (sl *Kubelet) SyncManifests(config []api.ContainerManifest) error { +func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { log.Printf("Desired:%#v", config) var err error desired := map[string]bool{} for _, manifest := range config { for _, element := range manifest.Containers { var exists bool - exists, actualName, err := sl.ContainerExists(&manifest, &element) + exists, actualName, err := kl.ContainerExists(&manifest, &element) if err != nil { log.Printf("Error detecting container: %#v skipping.", err) continue } if !exists { log.Printf("%#v doesn't exist, creating", element) - actualName, err = sl.RunContainer(&manifest, &element) + actualName, err = kl.RunContainer(&manifest, &element) // For some reason, list gives back names that start with '/' actualName = "/" + actualName @@ -538,12 +538,12 @@ func (sl *Kubelet) SyncManifests(config []api.ContainerManifest) error { desired[actualName] = true } } - existingContainers, _ := sl.ListContainers() + existingContainers, _ := kl.ListContainers() log.Printf("Existing:\n%#v Desired: %#v", existingContainers, desired) for _, container := range existingContainers { if !desired[container] { log.Printf("Killing: %s", container) - err = sl.KillContainer(container) + err = kl.KillContainer(container) if err != nil { log.Printf("Error killing container: %#v", err) } @@ -558,7 +558,7 @@ func (sl *Kubelet) SyncManifests(config []api.ContainerManifest) error { // no changes are seen to the configuration, will synchronize the last known desired // state every sync_frequency seconds. // Never returns. -func (sl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileChannel, serverChannel, httpChannel <-chan api.ContainerManifest, handler SyncHandler) { +func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileChannel, serverChannel, httpChannel <-chan api.ContainerManifest, handler SyncHandler) { var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest for { select { @@ -574,7 +574,7 @@ func (sl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileC case manifest := <-serverChannel: log.Printf("Got new manifest from our server... %v", manifest) lastServer = []api.ContainerManifest{manifest} - case <-time.After(sl.SyncFrequency): + case <-time.After(kl.SyncFrequency): } manifests := append([]api.ContainerManifest{}, lastFile...) @@ -588,8 +588,8 @@ func (sl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileC } } -func (sl *Kubelet) GetContainerInfo(name string) (string, error) { - info, err := sl.DockerClient.InspectContainer(name) +func (kl *Kubelet) GetContainerInfo(name string) (string, error) { + info, err := kl.DockerClient.InspectContainer(name) if err != nil { return "{}", err }