Merge pull request #1925 from thockin/klet

Accumulated cleanups to Kubelet
This commit is contained in:
Clayton Coleman 2014-10-24 11:20:29 -04:00
commit b5da5889e4
4 changed files with 128 additions and 111 deletions

76
pkg/kubelet/cadvisor.go Normal file
View File

@ -0,0 +1,76 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
cadvisor "github.com/google/cadvisor/info"
)
// cadvisorInterface is an abstract interface for testability. It abstracts the interface of "github.com/google/cadvisor/client".Client.
type cadvisorInterface interface {
ContainerInfo(name string, req *cadvisor.ContainerInfoRequest) (*cadvisor.ContainerInfo, error)
MachineInfo() (*cadvisor.MachineInfo, error)
}
// This method takes a container's absolute path and returns the stats for the
// container. The container's absolute path refers to its hierarchy in the
// cgroup file system. e.g. The root container, which represents the whole
// machine, has path "/"; all docker containers have path "/docker/<docker id>"
func (kl *Kubelet) statsFromContainerPath(cc cadvisorInterface, containerPath string, req *cadvisor.ContainerInfoRequest) (*cadvisor.ContainerInfo, error) {
cinfo, err := cc.ContainerInfo(containerPath, req)
if err != nil {
return nil, err
}
return cinfo, nil
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName, uuid, containerName string, req *cadvisor.ContainerInfoRequest) (*cadvisor.ContainerInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, nil
}
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
return nil, err
}
dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, containerName)
if !found {
return nil, fmt.Errorf("couldn't find container")
}
return kl.statsFromContainerPath(cc, fmt.Sprintf("/docker/%s", dockerContainer.ID), req)
}
// GetRootInfo returns stats (from Cadvisor) of current machine (root container).
func (kl *Kubelet) GetRootInfo(req *cadvisor.ContainerInfoRequest) (*cadvisor.ContainerInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, fmt.Errorf("no cadvisor connection")
}
return kl.statsFromContainerPath(cc, "/", req)
}
func (kl *Kubelet) GetMachineInfo() (*cadvisor.MachineInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, fmt.Errorf("no cadvisor connection")
}
return cc.MachineInfo()
}

View File

@ -39,7 +39,7 @@ func (e *execActionHandler) Run(podFullName, uuid string, container *api.Contain
type httpActionHandler struct {
kubelet *Kubelet
client httpGetInterface
client httpGetter
}
// ResolvePort attempts to turn a IntOrString port reference into a concrete port number.

View File

@ -17,7 +17,6 @@ limitations under the License.
package kubelet
import (
"errors"
"fmt"
"io"
"net/http"
@ -37,7 +36,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/google/cadvisor/info"
)
const defaultChanSize = 1024
@ -47,12 +45,6 @@ const minShares = 2
const sharesPerCPU = 1024
const milliCPUToCPU = 1000
// CadvisorInterface is an abstract interface for testability. It abstracts the interface of "github.com/google/cadvisor/client".Client.
type CadvisorInterface interface {
ContainerInfo(name string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
MachineInfo() (*info.MachineInfo, error)
}
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncPods([]api.BoundPod) error
@ -99,7 +91,7 @@ func NewIntegrationTestKubelet(hn string, rd string, dc dockertools.DockerInterf
}
}
type httpGetInterface interface {
type httpGetter interface {
Get(url string) (*http.Response, error)
}
@ -124,26 +116,26 @@ type Kubelet struct {
// Optional, defaults to simple Docker implementation
runner dockertools.ContainerCommandRunner
// Optional, client for http requests, defaults to empty client
httpClient httpGetInterface
httpClient httpGetter
// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
pullQPS float32
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
pullBurst int
// Optional, no statistics will be available if omitted
cadvisorClient CadvisorInterface
cadvisorClient cadvisorInterface
cadvisorLock sync.RWMutex
}
// SetCadvisorClient sets the cadvisor client in a thread-safe way.
func (kl *Kubelet) SetCadvisorClient(c CadvisorInterface) {
func (kl *Kubelet) SetCadvisorClient(c cadvisorInterface) {
kl.cadvisorLock.Lock()
defer kl.cadvisorLock.Unlock()
kl.cadvisorClient = c
}
// GetCadvisorClient gets the cadvisor client.
func (kl *Kubelet) GetCadvisorClient() CadvisorInterface {
func (kl *Kubelet) GetCadvisorClient() cadvisorInterface {
kl.cadvisorLock.RLock()
defer kl.cadvisorLock.RUnlock()
return kl.cadvisorClient
@ -417,13 +409,15 @@ func (kl *Kubelet) createNetworkContainer(pod *api.BoundPod) (dockertools.Docker
return kl.runContainer(pod, container, nil, "")
}
// Delete all containers in a pod (except the network container) returns the number of containers deleted
// and an error if one occurs.
func (kl *Kubelet) deleteAllContainers(pod *api.BoundPod, podFullName string, dockerContainers dockertools.DockerContainers) (int, error) {
// Kill all containers in a pod. Returns the number of containers deleted and an error if one occurs.
func (kl *Kubelet) killContainersInPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) (int, error) {
podFullName := GetPodFullName(pod)
count := 0
errs := make(chan error, len(pod.Spec.Containers))
wg := sync.WaitGroup{}
for _, container := range pod.Spec.Containers {
// TODO: Consider being more aggressive: kill all containers with this pod UID, period.
if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.UID, container.Name); found {
count++
wg.Add(1)
@ -459,22 +453,21 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
// Make sure we have a network container
var netID dockertools.DockerID
if networkDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found {
netID = dockertools.DockerID(networkDockerContainer.ID)
if netDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found {
netID = dockertools.DockerID(netDockerContainer.ID)
} else {
glog.V(3).Infof("Network container doesn't exist, creating")
count, err := kl.deleteAllContainers(pod, podFullName, dockerContainers)
glog.V(3).Infof("Network container doesn't exist for pod %q, re-creating the pod", podFullName)
count, err := kl.killContainersInPod(pod, dockerContainers)
if err != nil {
return err
}
dockerNetworkID, err := kl.createNetworkContainer(pod)
netID, err = kl.createNetworkContainer(pod)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName)
return err
}
netID = dockerNetworkID
if count > 0 {
// relist everything, otherwise we'll think we're ok
// Re-list everything, otherwise we'll think we're ok.
dockerContainers, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
@ -486,15 +479,14 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
glog.Errorf("Unable to mount volumes for pod %s: (%v) Skipping pod.", podFullName, err)
glog.Errorf("Unable to mount volumes for pod %s: (%v), skipping pod", podFullName, err)
return err
}
podState := api.PodState{}
info, err := kl.GetPodInfo(podFullName, uuid)
if err != nil {
glog.Errorf("Unable to get pod with name %s and uuid %s info, health checks may be invalid.",
podFullName, uuid)
glog.Errorf("Unable to get pod with name %s and uuid %s info, health checks may be invalid", podFullName, uuid)
}
netInfo, found := info[networkContainerName]
if found {
@ -586,6 +578,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
glog.V(1).Infof("Killing unwanted container in pod %q: %+v", curUUID, container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
@ -638,13 +631,14 @@ func (kl *Kubelet) reconcileVolumes(pods []api.BoundPod) error {
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
glog.V(4).Infof("Desired [%s]: %+v", kl.hostname, pods)
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
desiredPods := make(map[string]empty)
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
glog.Errorf("Error listing containers: %#v", dockerContainers)
return err
}
@ -653,6 +647,7 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
pod := &pods[ix]
podFullName := GetPodFullName(pod)
uuid := pod.UID
desiredPods[uuid] = empty{}
// Add all containers (including net) to the map.
desiredContainers[podContainer{podFullName, uuid, networkContainerName}] = empty{}
@ -664,24 +659,25 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
kl.podWorkers.Run(podFullName, func() {
err := kl.syncPod(pod, dockerContainers)
if err != nil {
glog.Errorf("Error syncing pod: %v skipping.", err)
glog.Errorf("Error syncing pod, skipping: %s", err)
}
})
}
// Kill any containers we don't need
existingContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers: %v", err)
return err
}
for _, container := range existingContainers {
// Kill any containers we don't need.
for _, container := range dockerContainers {
// Don't kill containers that are in the desired pods.
podFullName, uuid, containerName, _ := dockertools.ParseDockerName(container.Names[0])
if _, ok := desiredContainers[podContainer{podFullName, uuid, containerName}]; !ok {
if _, found := desiredPods[uuid]; found {
// syncPod() will handle this one.
continue
}
pc := podContainer{podFullName, uuid, containerName}
if _, ok := desiredContainers[pc]; !ok {
glog.V(1).Infof("Killing unwanted container %+v", pc)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
glog.Errorf("Error killing container %+v: %s", pc, err)
}
}
}
@ -700,7 +696,7 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
for i := range pods {
pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %s has conflicting ports, ignoring: %v", GetPodFullName(pod), errs)
glog.Warningf("Pod %s: HostPort is already allocated, ignoring: %s", GetPodFullName(pod), errs)
continue
}
filtered = append(filtered, *pod)
@ -720,7 +716,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
case u := <-updates:
switch u.Op {
case SET, UPDATE:
glog.V(3).Infof("Containers changed [%s]", kl.hostname)
glog.V(3).Infof("Containers changed")
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
@ -728,6 +724,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
panic("syncLoop does not support incremental changes")
}
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
if kl.pods == nil {
continue
}
@ -735,30 +732,11 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
err := handler.SyncPods(kl.pods)
if err != nil {
glog.Errorf("Couldn't sync containers : %v", err)
glog.Errorf("Couldn't sync containers: %s", err)
}
}
}
func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.ContainerInfoRequest {
ret := &info.ContainerInfoRequest{
NumStats: req.NumStats,
}
return ret
}
// This method takes a container's absolute path and returns the stats for the
// container. The container's absolute path refers to its hierarchy in the
// cgroup file system. e.g. The root container, which represents the whole
// machine, has path "/"; all docker containers have path "/docker/<docker id>"
func (kl *Kubelet) statsFromContainerPath(cc CadvisorInterface, containerPath string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
cinfo, err := cc.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req))
if err != nil {
return nil, err
}
return cinfo, nil
}
// GetKubeletContainerLogs returns logs from the container
// The second parameter of GetPodInfo and FindPodContainer methods represents pod UUID, which is allowed to be blank
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error {
@ -789,40 +767,6 @@ func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) {
return dockertools.GetDockerPodInfo(kl.dockerClient, manifest, podFullName, uuid)
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName, uuid, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, nil
}
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
return nil, err
}
dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, containerName)
if !found {
return nil, errors.New("couldn't find container")
}
return kl.statsFromContainerPath(cc, fmt.Sprintf("/docker/%s", dockerContainer.ID), req)
}
// GetRootInfo returns stats (from Cadvisor) of current machine (root container).
func (kl *Kubelet) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, fmt.Errorf("no cadvisor connection")
}
return kl.statsFromContainerPath(cc, "/", req)
}
func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, fmt.Errorf("no cadvisor connection")
}
return cc.MachineInfo()
}
func (kl *Kubelet) healthy(podFullName, podUUID string, currentState api.PodState, container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
// Give the container 60 seconds to start up.
if container.LivenessProbe == nil {

View File

@ -183,7 +183,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "list"})
verifyCalls(t, fakeDocker, []string{"list"})
}
// drainWorkers waits until all workers are done. Should only used for testing.
@ -231,7 +231,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "list", "inspect_container", "list", "create", "start"})
"list", "create", "start", "list", "inspect_container", "list", "create", "start"})
fakeDocker.Lock()
@ -279,7 +279,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "list", "inspect_container", "list", "create", "start"})
"list", "create", "start", "list", "inspect_container", "list", "create", "start"})
fakeDocker.Lock()
@ -324,7 +324,7 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) {
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect_container", "list", "create", "start"})
"list", "list", "inspect_container", "list", "create", "start"})
fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
@ -376,7 +376,7 @@ func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) {
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect_container", "list", "create", "start"})
"list", "list", "inspect_container", "list", "create", "start"})
fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
@ -418,7 +418,7 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) {
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "stop", "create", "start", "list", "list", "inspect_container", "list", "create", "start"})
"list", "stop", "create", "start", "list", "list", "inspect_container", "list", "create", "start"})
// A map iteration is used to delete containers, so must not depend on
// order here.
@ -455,7 +455,7 @@ func TestSyncPodsDeletes(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"})
// A map iteration is used to delete containers, so must not depend on
// order here.
@ -847,8 +847,7 @@ func TestGetContainerInfo(t *testing.T) {
}
mockCadvisor := &mockCadvisorClient{}
req := &info.ContainerInfoRequest{}
cadvisorReq := getCadvisorContainerInfoRequest(req)
cadvisorReq := &info.ContainerInfoRequest{}
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil)
kubelet, _, fakeDocker := newTestKubelet(t)
@ -862,7 +861,7 @@ func TestGetContainerInfo(t *testing.T) {
},
}
stats, err := kubelet.GetContainerInfo("qux", "", "foo", req)
stats, err := kubelet.GetContainerInfo("qux", "", "foo", cadvisorReq)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -882,8 +881,7 @@ func TestGetRootInfo(t *testing.T) {
fakeDocker := dockertools.FakeDockerClient{}
mockCadvisor := &mockCadvisorClient{}
req := &info.ContainerInfoRequest{}
cadvisorReq := getCadvisorContainerInfoRequest(req)
cadvisorReq := &info.ContainerInfoRequest{}
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil)
kubelet := Kubelet{
@ -894,7 +892,7 @@ func TestGetRootInfo(t *testing.T) {
}
// If the container name is an empty string, then it means the root container.
_, err := kubelet.GetRootInfo(req)
_, err := kubelet.GetRootInfo(cadvisorReq)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -925,8 +923,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
containerInfo := &info.ContainerInfo{}
mockCadvisor := &mockCadvisorClient{}
req := &info.ContainerInfoRequest{}
cadvisorReq := getCadvisorContainerInfoRequest(req)
cadvisorReq := &info.ContainerInfoRequest{}
expectedErr := fmt.Errorf("some error")
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, expectedErr)
@ -941,7 +938,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
},
}
stats, err := kubelet.GetContainerInfo("qux", "uuid", "foo", req)
stats, err := kubelet.GetContainerInfo("qux", "uuid", "foo", cadvisorReq)
if stats != nil {
t.Errorf("non-nil stats on error")
}