diff --git a/pkg/kubelet/cadvisor.go b/pkg/kubelet/cadvisor.go new file mode 100644 index 00000000000..427f1cce220 --- /dev/null +++ b/pkg/kubelet/cadvisor.go @@ -0,0 +1,76 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + cadvisor "github.com/google/cadvisor/info" +) + +// cadvisorInterface is an abstract interface for testability. It abstracts the interface of "github.com/google/cadvisor/client".Client. +type cadvisorInterface interface { + ContainerInfo(name string, req *cadvisor.ContainerInfoRequest) (*cadvisor.ContainerInfo, error) + MachineInfo() (*cadvisor.MachineInfo, error) +} + +// This method takes a container's absolute path and returns the stats for the +// container. The container's absolute path refers to its hierarchy in the +// cgroup file system. e.g. The root container, which represents the whole +// machine, has path "/"; all docker containers have path "/docker/" +func (kl *Kubelet) statsFromContainerPath(cc cadvisorInterface, containerPath string, req *cadvisor.ContainerInfoRequest) (*cadvisor.ContainerInfo, error) { + cinfo, err := cc.ContainerInfo(containerPath, req) + if err != nil { + return nil, err + } + return cinfo, nil +} + +// GetContainerInfo returns stats (from Cadvisor) for a container. +func (kl *Kubelet) GetContainerInfo(podFullName, uuid, containerName string, req *cadvisor.ContainerInfoRequest) (*cadvisor.ContainerInfo, error) { + cc := kl.GetCadvisorClient() + if cc == nil { + return nil, nil + } + dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) + if err != nil { + return nil, err + } + dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, containerName) + if !found { + return nil, fmt.Errorf("couldn't find container") + } + return kl.statsFromContainerPath(cc, fmt.Sprintf("/docker/%s", dockerContainer.ID), req) +} + +// GetRootInfo returns stats (from Cadvisor) of current machine (root container). +func (kl *Kubelet) GetRootInfo(req *cadvisor.ContainerInfoRequest) (*cadvisor.ContainerInfo, error) { + cc := kl.GetCadvisorClient() + if cc == nil { + return nil, fmt.Errorf("no cadvisor connection") + } + return kl.statsFromContainerPath(cc, "/", req) +} + +func (kl *Kubelet) GetMachineInfo() (*cadvisor.MachineInfo, error) { + cc := kl.GetCadvisorClient() + if cc == nil { + return nil, fmt.Errorf("no cadvisor connection") + } + return cc.MachineInfo() +} diff --git a/pkg/kubelet/handlers.go b/pkg/kubelet/handlers.go index c420d687f2d..2b02e5960be 100644 --- a/pkg/kubelet/handlers.go +++ b/pkg/kubelet/handlers.go @@ -39,7 +39,7 @@ func (e *execActionHandler) Run(podFullName, uuid string, container *api.Contain type httpActionHandler struct { kubelet *Kubelet - client httpGetInterface + client httpGetter } // ResolvePort attempts to turn a IntOrString port reference into a concrete port number. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7938e086bda..fe0e5833cbc 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -17,7 +17,6 @@ limitations under the License. package kubelet import ( - "errors" "fmt" "io" "net/http" @@ -37,7 +36,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" "github.com/fsouza/go-dockerclient" "github.com/golang/glog" - "github.com/google/cadvisor/info" ) const defaultChanSize = 1024 @@ -47,12 +45,6 @@ const minShares = 2 const sharesPerCPU = 1024 const milliCPUToCPU = 1000 -// CadvisorInterface is an abstract interface for testability. It abstracts the interface of "github.com/google/cadvisor/client".Client. -type CadvisorInterface interface { - ContainerInfo(name string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) - MachineInfo() (*info.MachineInfo, error) -} - // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { SyncPods([]api.BoundPod) error @@ -99,7 +91,7 @@ func NewIntegrationTestKubelet(hn string, rd string, dc dockertools.DockerInterf } } -type httpGetInterface interface { +type httpGetter interface { Get(url string) (*http.Response, error) } @@ -124,26 +116,26 @@ type Kubelet struct { // Optional, defaults to simple Docker implementation runner dockertools.ContainerCommandRunner // Optional, client for http requests, defaults to empty client - httpClient httpGetInterface + httpClient httpGetter // Optional, maximum pull QPS from the docker registry, 0.0 means unlimited. pullQPS float32 // Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0 pullBurst int // Optional, no statistics will be available if omitted - cadvisorClient CadvisorInterface + cadvisorClient cadvisorInterface cadvisorLock sync.RWMutex } // SetCadvisorClient sets the cadvisor client in a thread-safe way. -func (kl *Kubelet) SetCadvisorClient(c CadvisorInterface) { +func (kl *Kubelet) SetCadvisorClient(c cadvisorInterface) { kl.cadvisorLock.Lock() defer kl.cadvisorLock.Unlock() kl.cadvisorClient = c } // GetCadvisorClient gets the cadvisor client. -func (kl *Kubelet) GetCadvisorClient() CadvisorInterface { +func (kl *Kubelet) GetCadvisorClient() cadvisorInterface { kl.cadvisorLock.RLock() defer kl.cadvisorLock.RUnlock() return kl.cadvisorClient @@ -417,13 +409,15 @@ func (kl *Kubelet) createNetworkContainer(pod *api.BoundPod) (dockertools.Docker return kl.runContainer(pod, container, nil, "") } -// Delete all containers in a pod (except the network container) returns the number of containers deleted -// and an error if one occurs. -func (kl *Kubelet) deleteAllContainers(pod *api.BoundPod, podFullName string, dockerContainers dockertools.DockerContainers) (int, error) { +// Kill all containers in a pod. Returns the number of containers deleted and an error if one occurs. +func (kl *Kubelet) killContainersInPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) (int, error) { + podFullName := GetPodFullName(pod) + count := 0 errs := make(chan error, len(pod.Spec.Containers)) wg := sync.WaitGroup{} for _, container := range pod.Spec.Containers { + // TODO: Consider being more aggressive: kill all containers with this pod UID, period. if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.UID, container.Name); found { count++ wg.Add(1) @@ -459,22 +453,21 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke // Make sure we have a network container var netID dockertools.DockerID - if networkDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found { - netID = dockertools.DockerID(networkDockerContainer.ID) + if netDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found { + netID = dockertools.DockerID(netDockerContainer.ID) } else { - glog.V(3).Infof("Network container doesn't exist, creating") - count, err := kl.deleteAllContainers(pod, podFullName, dockerContainers) + glog.V(3).Infof("Network container doesn't exist for pod %q, re-creating the pod", podFullName) + count, err := kl.killContainersInPod(pod, dockerContainers) if err != nil { return err } - dockerNetworkID, err := kl.createNetworkContainer(pod) + netID, err = kl.createNetworkContainer(pod) if err != nil { glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName) return err } - netID = dockerNetworkID if count > 0 { - // relist everything, otherwise we'll think we're ok + // Re-list everything, otherwise we'll think we're ok. dockerContainers, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false) if err != nil { glog.Errorf("Error listing containers %#v", dockerContainers) @@ -486,15 +479,14 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke podVolumes, err := kl.mountExternalVolumes(pod) if err != nil { - glog.Errorf("Unable to mount volumes for pod %s: (%v) Skipping pod.", podFullName, err) + glog.Errorf("Unable to mount volumes for pod %s: (%v), skipping pod", podFullName, err) return err } podState := api.PodState{} info, err := kl.GetPodInfo(podFullName, uuid) if err != nil { - glog.Errorf("Unable to get pod with name %s and uuid %s info, health checks may be invalid.", - podFullName, uuid) + glog.Errorf("Unable to get pod with name %s and uuid %s info, health checks may be invalid", podFullName, uuid) } netInfo, found := info[networkContainerName] if found { @@ -586,6 +578,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke _, keep := containersToKeep[id] _, killed := killedContainers[id] if !keep && !killed { + glog.V(1).Infof("Killing unwanted container in pod %q: %+v", curUUID, container) err = kl.killContainer(container) if err != nil { glog.Errorf("Error killing container: %v", err) @@ -638,13 +631,14 @@ func (kl *Kubelet) reconcileVolumes(pods []api.BoundPod) error { // SyncPods synchronizes the configured list of pods (desired state) with the host current state. func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { - glog.V(4).Infof("Desired [%s]: %+v", kl.hostname, pods) + glog.V(4).Infof("Desired: %#v", pods) var err error desiredContainers := make(map[podContainer]empty) + desiredPods := make(map[string]empty) dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) if err != nil { - glog.Errorf("Error listing containers %#v", dockerContainers) + glog.Errorf("Error listing containers: %#v", dockerContainers) return err } @@ -653,6 +647,7 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { pod := &pods[ix] podFullName := GetPodFullName(pod) uuid := pod.UID + desiredPods[uuid] = empty{} // Add all containers (including net) to the map. desiredContainers[podContainer{podFullName, uuid, networkContainerName}] = empty{} @@ -664,24 +659,25 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { kl.podWorkers.Run(podFullName, func() { err := kl.syncPod(pod, dockerContainers) if err != nil { - glog.Errorf("Error syncing pod: %v skipping.", err) + glog.Errorf("Error syncing pod, skipping: %s", err) } }) } - // Kill any containers we don't need - existingContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) - if err != nil { - glog.Errorf("Error listing containers: %v", err) - return err - } - for _, container := range existingContainers { + // Kill any containers we don't need. + for _, container := range dockerContainers { // Don't kill containers that are in the desired pods. podFullName, uuid, containerName, _ := dockertools.ParseDockerName(container.Names[0]) - if _, ok := desiredContainers[podContainer{podFullName, uuid, containerName}]; !ok { + if _, found := desiredPods[uuid]; found { + // syncPod() will handle this one. + continue + } + pc := podContainer{podFullName, uuid, containerName} + if _, ok := desiredContainers[pc]; !ok { + glog.V(1).Infof("Killing unwanted container %+v", pc) err = kl.killContainer(container) if err != nil { - glog.Errorf("Error killing container: %v", err) + glog.Errorf("Error killing container %+v: %s", pc, err) } } } @@ -700,7 +696,7 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { for i := range pods { pod := &pods[i] if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { - glog.Warningf("Pod %s has conflicting ports, ignoring: %v", GetPodFullName(pod), errs) + glog.Warningf("Pod %s: HostPort is already allocated, ignoring: %s", GetPodFullName(pod), errs) continue } filtered = append(filtered, *pod) @@ -720,7 +716,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { case u := <-updates: switch u.Op { case SET, UPDATE: - glog.V(3).Infof("Containers changed [%s]", kl.hostname) + glog.V(3).Infof("Containers changed") kl.pods = u.Pods kl.pods = filterHostPortConflicts(kl.pods) @@ -728,6 +724,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { panic("syncLoop does not support incremental changes") } case <-time.After(kl.resyncInterval): + glog.V(4).Infof("Periodic sync") if kl.pods == nil { continue } @@ -735,30 +732,11 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { err := handler.SyncPods(kl.pods) if err != nil { - glog.Errorf("Couldn't sync containers : %v", err) + glog.Errorf("Couldn't sync containers: %s", err) } } } -func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.ContainerInfoRequest { - ret := &info.ContainerInfoRequest{ - NumStats: req.NumStats, - } - return ret -} - -// This method takes a container's absolute path and returns the stats for the -// container. The container's absolute path refers to its hierarchy in the -// cgroup file system. e.g. The root container, which represents the whole -// machine, has path "/"; all docker containers have path "/docker/" -func (kl *Kubelet) statsFromContainerPath(cc CadvisorInterface, containerPath string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { - cinfo, err := cc.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req)) - if err != nil { - return nil, err - } - return cinfo, nil -} - // GetKubeletContainerLogs returns logs from the container // The second parameter of GetPodInfo and FindPodContainer methods represents pod UUID, which is allowed to be blank func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error { @@ -789,40 +767,6 @@ func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) { return dockertools.GetDockerPodInfo(kl.dockerClient, manifest, podFullName, uuid) } -// GetContainerInfo returns stats (from Cadvisor) for a container. -func (kl *Kubelet) GetContainerInfo(podFullName, uuid, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { - cc := kl.GetCadvisorClient() - if cc == nil { - return nil, nil - } - dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) - if err != nil { - return nil, err - } - dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, containerName) - if !found { - return nil, errors.New("couldn't find container") - } - return kl.statsFromContainerPath(cc, fmt.Sprintf("/docker/%s", dockerContainer.ID), req) -} - -// GetRootInfo returns stats (from Cadvisor) of current machine (root container). -func (kl *Kubelet) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { - cc := kl.GetCadvisorClient() - if cc == nil { - return nil, fmt.Errorf("no cadvisor connection") - } - return kl.statsFromContainerPath(cc, "/", req) -} - -func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) { - cc := kl.GetCadvisorClient() - if cc == nil { - return nil, fmt.Errorf("no cadvisor connection") - } - return cc.MachineInfo() -} - func (kl *Kubelet) healthy(podFullName, podUUID string, currentState api.PodState, container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) { // Give the container 60 seconds to start up. if container.LivenessProbe == nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 0ca6174dfa3..3f1eea6061b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -183,7 +183,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "list"}) + verifyCalls(t, fakeDocker, []string{"list"}) } // drainWorkers waits until all workers are done. Should only used for testing. @@ -231,7 +231,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "create", "start", "list", "inspect_container", "list", "create", "start"}) + "list", "create", "start", "list", "inspect_container", "list", "create", "start"}) fakeDocker.Lock() @@ -279,7 +279,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "create", "start", "list", "inspect_container", "list", "create", "start"}) + "list", "create", "start", "list", "inspect_container", "list", "create", "start"}) fakeDocker.Lock() @@ -324,7 +324,7 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) { kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "list", "create", "start"}) + "list", "list", "inspect_container", "list", "create", "start"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -376,7 +376,7 @@ func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) { kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "list", "create", "start"}) + "list", "list", "inspect_container", "list", "create", "start"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -418,7 +418,7 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) { kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "stop", "create", "start", "list", "list", "inspect_container", "list", "create", "start"}) + "list", "stop", "create", "start", "list", "list", "inspect_container", "list", "create", "start"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -455,7 +455,7 @@ func TestSyncPodsDeletes(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -847,8 +847,7 @@ func TestGetContainerInfo(t *testing.T) { } mockCadvisor := &mockCadvisorClient{} - req := &info.ContainerInfoRequest{} - cadvisorReq := getCadvisorContainerInfoRequest(req) + cadvisorReq := &info.ContainerInfoRequest{} mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil) kubelet, _, fakeDocker := newTestKubelet(t) @@ -862,7 +861,7 @@ func TestGetContainerInfo(t *testing.T) { }, } - stats, err := kubelet.GetContainerInfo("qux", "", "foo", req) + stats, err := kubelet.GetContainerInfo("qux", "", "foo", cadvisorReq) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -882,8 +881,7 @@ func TestGetRootInfo(t *testing.T) { fakeDocker := dockertools.FakeDockerClient{} mockCadvisor := &mockCadvisorClient{} - req := &info.ContainerInfoRequest{} - cadvisorReq := getCadvisorContainerInfoRequest(req) + cadvisorReq := &info.ContainerInfoRequest{} mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil) kubelet := Kubelet{ @@ -894,7 +892,7 @@ func TestGetRootInfo(t *testing.T) { } // If the container name is an empty string, then it means the root container. - _, err := kubelet.GetRootInfo(req) + _, err := kubelet.GetRootInfo(cadvisorReq) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -925,8 +923,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { containerInfo := &info.ContainerInfo{} mockCadvisor := &mockCadvisorClient{} - req := &info.ContainerInfoRequest{} - cadvisorReq := getCadvisorContainerInfoRequest(req) + cadvisorReq := &info.ContainerInfoRequest{} expectedErr := fmt.Errorf("some error") mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, expectedErr) @@ -941,7 +938,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { }, } - stats, err := kubelet.GetContainerInfo("qux", "uuid", "foo", req) + stats, err := kubelet.GetContainerInfo("qux", "uuid", "foo", cadvisorReq) if stats != nil { t.Errorf("non-nil stats on error") }