Merge pull request #5143 from vmarmol/cadvisor-pkg

Refactoring Kubelet's cAdvisor interface into a package.
This commit is contained in:
Victor Marmol 2015-03-09 11:36:03 -07:00
commit e187be9fe1
9 changed files with 245 additions and 259 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"math/rand"
"net"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -30,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
@ -38,6 +40,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
cadvisorClient "github.com/google/cadvisor/client"
"github.com/spf13/pflag"
)
@ -398,6 +401,16 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
} else {
kubeClient = kc.KubeClient
}
cc, err := cadvisorClient.NewClient("http://127.0.0.1:" + strconv.Itoa(int(kc.CAdvisorPort)))
if err != nil {
return nil, err
}
cadvisorInterface, err := cadvisor.New(cc)
if err != nil {
return nil, err
}
k, err := kubelet.NewMainKubelet(
kc.Hostname,
kc.DockerClient,
@ -416,7 +429,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.MasterServiceNamespace,
kc.VolumePlugins,
kc.StreamingConnectionIdleTimeout,
kc.Recorder)
kc.Recorder,
cadvisorInterface)
if err != nil {
return nil, err
@ -425,7 +439,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
k.BirthCry()
go k.GarbageCollectLoop()
go kubelet.MonitorCAdvisor(k, kc.CAdvisorPort)
return k, nil
}

View File

@ -1,108 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"errors"
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
var (
// ErrNoKubeletContainers returned when there are not containers managed by the kubelet (ie: either no containers on the node, or none that the kubelet cares about).
ErrNoKubeletContainers = errors.New("no containers managed by kubelet")
// ErrContainerNotFound returned when a container in the given pod with the given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors.New("no matching container")
// ErrCadvisorApiFailure returned when cadvisor couldn't retrieve stats for the given container, either because it isn't running or it was confused by the request
ErrCadvisorApiFailure = errors.New("failed to retrieve cadvisor stats")
)
// cadvisorInterface is an abstract interface for testability. It abstracts the interface of "github.com/google/cadvisor/client".Client.
type cadvisorInterface interface {
DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error)
ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
MachineInfo() (*cadvisorApi.MachineInfo, error)
}
// statsFromContainerPath takes a container's absolute path and returns the stats for the
// container. The container's absolute path refers to its hierarchy in the
// cgroup file system. e.g. The root container, which represents the whole
// machine, has path "/"; all docker containers have path "/docker/<docker id>"
func statsFromContainerPath(cc cadvisorInterface, containerPath string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
cinfo, err := cc.ContainerInfo(containerPath, req)
if err != nil {
return nil, err
}
return cinfo, nil
}
// statsFromDockerContainer takes a Docker container's ID and returns the stats for the
// container.
func statsFromDockerContainer(cc cadvisorInterface, containerId string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
cinfo, err := cc.DockerContainer(containerId, req)
if err != nil {
return nil, err
}
return &cinfo, nil
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, fmt.Errorf("no cadvisor connection")
}
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
return nil, err
}
if len(dockerContainers) == 0 {
return nil, ErrNoKubeletContainers
}
dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, containerName)
if !found {
return nil, ErrContainerNotFound
}
ci, err := statsFromDockerContainer(cc, dockerContainer.ID, req)
if err != nil {
return nil, ErrCadvisorApiFailure
}
return ci, nil
}
// GetRootInfo returns stats (from Cadvisor) of current machine (root container).
func (kl *Kubelet) GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, fmt.Errorf("no cadvisor connection")
}
return statsFromContainerPath(cc, "/", req)
}
func (kl *Kubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
cc := kl.GetCadvisorClient()
if cc == nil {
return nil, fmt.Errorf("no cadvisor connection")
}
return cc.MachineInfo()
}

View File

@ -0,0 +1,31 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cadvisor
import (
"github.com/google/cadvisor/client"
)
type cadvisorClient struct {
*client.Client
}
func New(cc *client.Client) (Interface, error) {
return &cadvisorClient{
Client: cc,
}, nil
}

View File

@ -0,0 +1,44 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cadvisor
import (
cadvisorApi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/mock"
)
type Mock struct {
mock.Mock
}
// ContainerInfo is a mock implementation of CadvisorInterface.ContainerInfo.
func (c *Mock) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
args := c.Called(name, req)
return args.Get(0).(*cadvisorApi.ContainerInfo), args.Error(1)
}
// DockerContainer is a mock implementation of CadvisorInterface.DockerContainer.
func (c *Mock) DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error) {
args := c.Called(name, req)
return args.Get(0).(cadvisorApi.ContainerInfo), args.Error(1)
}
// MachineInfo is a mock implementation of CadvisorInterface.MachineInfo.
func (c *Mock) MachineInfo() (*cadvisorApi.MachineInfo, error) {
args := c.Called()
return args.Get(0).(*cadvisorApi.MachineInfo), args.Error(1)
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Kubelet interactions with cAdvisor.
package cadvisor

View File

@ -0,0 +1,28 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cadvisor
import (
cadvisorApi "github.com/google/cadvisor/info/v1"
)
// Interface is an abstract interface for testability. It abstracts the interface to cAdvisor.
type Interface interface {
DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error)
ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
MachineInfo() (*cadvisorApi.MachineInfo, error)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package kubelet
import (
"errors"
"fmt"
"io"
"io/ioutil"
@ -36,6 +37,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
@ -46,10 +48,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
utilErrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
const (
@ -66,6 +69,14 @@ const (
maxWaitForDocker = 5 * time.Minute
)
var (
// ErrNoKubeletContainers returned when there are not containers managed by the kubelet (ie: either no containers on the node, or none that the kubelet cares about).
ErrNoKubeletContainers = errors.New("no containers managed by kubelet")
// ErrContainerNotFound returned when a container in the given pod with the given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors.New("no matching container")
)
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
@ -97,7 +108,8 @@ func NewMainKubelet(
masterServiceNamespace string,
volumePlugins []volume.Plugin,
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder) (*Kubelet, error) {
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -165,6 +177,7 @@ func NewMainKubelet(
readiness: newReadinessStates(),
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
cadvisor: cadvisorInterface,
}
dockerCache, err := dockertools.NewDockerCache(dockerClient)
@ -234,9 +247,8 @@ type Kubelet struct {
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
pullBurst int
// Optional, no statistics will be available if omitted
cadvisorClient cadvisorInterface
cadvisorLock sync.RWMutex
// cAdvisor used for container information.
cadvisor cadvisor.Interface
// Optional, minimum age required for garbage collection. If zero, no limit.
minimumGCAge time.Duration
@ -476,20 +488,6 @@ func (kl *Kubelet) GarbageCollectContainers() error {
return nil
}
// SetCadvisorClient sets the cadvisor client in a thread-safe way.
func (kl *Kubelet) SetCadvisorClient(c cadvisorInterface) {
kl.cadvisorLock.Lock()
defer kl.cadvisorLock.Unlock()
kl.cadvisorClient = c
}
// GetCadvisorClient gets the cadvisor client.
func (kl *Kubelet) GetCadvisorClient() cadvisorInterface {
kl.cadvisorLock.RLock()
defer kl.cadvisorLock.RUnlock()
return kl.cadvisorClient
}
func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) {
kl.podStatusesLock.RLock()
defer kl.podStatusesLock.RUnlock()
@ -1310,7 +1308,7 @@ func (kl *Kubelet) cleanupOrphanedPods(pods []api.BoundPod) error {
}
}
}
return errors.NewAggregate(errlist)
return utilErrors.NewAggregate(errlist)
}
// Compares the map of current volumes to the map of desired volumes.
@ -1921,3 +1919,33 @@ func (kl *Kubelet) BirthCry() {
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
return nil, err
}
if len(dockerContainers) == 0 {
return nil, ErrNoKubeletContainers
}
dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, containerName)
if !found {
return nil, ErrContainerNotFound
}
ci, err := kl.cadvisor.DockerContainer(dockerContainer.ID, req)
if err != nil {
return nil, err
}
return &ci, nil
}
// GetRootInfo returns stats (from Cadvisor) of current machine (root container).
func (kl *Kubelet) GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
return kl.cadvisor.ContainerInfo("/", req)
}
func (kl *Kubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
return kl.cadvisor.MachineInfo()
}

View File

@ -34,6 +34,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
@ -42,7 +43,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
cadvisorApi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/mock"
)
func init() {
@ -50,7 +50,8 @@ func init() {
util.ReallyCrash = true
}
func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *sync.WaitGroup) {
// TODO(vmarmol): Consider compacting these return types of handling this better.
func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *sync.WaitGroup, *cadvisor.Mock) {
fakeDocker := &dockertools.FakeDockerClient{
RemovedImages: util.StringSet{},
}
@ -87,8 +88,10 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
mockCadvisor := &cadvisor.Mock{}
kubelet.cadvisor = mockCadvisor
return kubelet, fakeDocker, waitGroup
return kubelet, fakeDocker, waitGroup, mockCadvisor
}
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
@ -140,7 +143,7 @@ func verifyBoolean(t *testing.T, expected, value bool) {
}
func TestKubeletDirs(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
root := kubelet.rootDirectory
var exp, got string
@ -201,7 +204,7 @@ func TestKubeletDirs(t *testing.T) {
}
func TestKubeletDirsCompat(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
root := kubelet.rootDirectory
if err := os.MkdirAll(root, 0750); err != nil {
t.Fatalf("can't mkdir(%q): %s", root, err)
@ -307,7 +310,7 @@ func TestKillContainerWithError(t *testing.T) {
Err: fmt.Errorf("sample error"),
ContainerList: append([]docker.APIContainers{}, containers...),
}
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
for _, c := range fakeDocker.ContainerList {
kubelet.readiness.set(c.ID, true)
}
@ -338,7 +341,7 @@ func TestKillContainer(t *testing.T) {
Names: []string{"/k8s_bar_qux_5678_42"},
},
}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
fakeDocker.ContainerList = append([]docker.APIContainers{}, containers...)
fakeDocker.Container = &docker.Container{
Name: "foobar",
@ -391,7 +394,7 @@ func (cr *channelReader) GetList() [][]api.BoundPod {
var emptyPodUIDs map[types.UID]metrics.SyncPodType
func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t)
container := api.Container{Name: "bar"}
fakeDocker.ContainerList = []docker.APIContainers{
{
@ -429,7 +432,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
}
func TestSyncPodsWithTerminationLog(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t)
container := api.Container{
Name: "bar",
TerminationMessagePath: "/dev/somepath",
@ -478,7 +481,7 @@ func matchString(t *testing.T, pattern, str string) bool {
}
func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t)
kubelet.podInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.pods = []api.BoundPod{
@ -526,7 +529,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
}
func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t)
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{}
kubelet.podInfraContainerImage = "custom_image_name"
@ -570,7 +573,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
}
func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
{
// pod infra container
@ -611,7 +614,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
}
func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t)
fakeHttp := fakeHTTP{}
kubelet.httpClient = &fakeHttp
fakeDocker.ContainerList = []docker.APIContainers{
@ -668,7 +671,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
}
func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
{
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>
@ -714,7 +717,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.sourcesReady = func() bool { return ready }
fakeDocker.ContainerList = []docker.APIContainers{
@ -756,7 +759,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
}
func TestSyncPodsDeletes(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
@ -794,7 +797,7 @@ func TestSyncPodsDeletes(t *testing.T) {
}
func TestSyncPodDeletesDuplicate(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
@ -838,7 +841,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
}
func TestSyncPodBadHash(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
@ -886,7 +889,7 @@ func TestSyncPodBadHash(t *testing.T) {
}
func TestSyncPodUnhealthy(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
@ -935,7 +938,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
}
func TestMountExternalVolumes(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{&volume.FakePlugin{"fake", nil}}, &volumeHost{kubelet})
pod := api.BoundPod{
@ -969,7 +972,7 @@ func TestMountExternalVolumes(t *testing.T) {
}
func TestGetPodVolumesFromDisk(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
plug := &volume.FakePlugin{"fake", nil}
kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{plug}, &volumeHost{kubelet})
@ -1172,10 +1175,6 @@ func TestFieldPath(t *testing.T) {
}
}
type mockCadvisorClient struct {
mock.Mock
}
type errorTestingDockerClient struct {
dockertools.FakeDockerClient
listContainersError error
@ -1186,24 +1185,6 @@ func (f *errorTestingDockerClient) ListContainers(options docker.ListContainersO
return f.containerList, f.listContainersError
}
// ContainerInfo is a mock implementation of CadvisorInterface.ContainerInfo.
func (c *mockCadvisorClient) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
args := c.Called(name, req)
return args.Get(0).(*cadvisorApi.ContainerInfo), args.Error(1)
}
// DockerContainer is a mock implementation of CadvisorInterface.DockerContainer.
func (c *mockCadvisorClient) DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error) {
args := c.Called(name, req)
return args.Get(0).(cadvisorApi.ContainerInfo), args.Error(1)
}
// MachineInfo is a mock implementation of CadvisorInterface.MachineInfo.
func (c *mockCadvisorClient) MachineInfo() (*cadvisorApi.MachineInfo, error) {
args := c.Called()
return args.Get(0).(*cadvisorApi.MachineInfo), args.Error(1)
}
func TestGetContainerInfo(t *testing.T) {
containerID := "ab2cdf"
containerPath := fmt.Sprintf("/docker/%v", containerID)
@ -1213,12 +1194,9 @@ func TestGetContainerInfo(t *testing.T) {
},
}
mockCadvisor := &mockCadvisorClient{}
kubelet, fakeDocker, _, mockCadvisor := newTestKubelet(t)
cadvisorReq := &cadvisorApi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
fakeDocker.ContainerList = []docker.APIContainers{
{
ID: containerID,
@ -1247,14 +1225,14 @@ func TestGetRootInfo(t *testing.T) {
}
fakeDocker := dockertools.FakeDockerClient{}
mockCadvisor := &mockCadvisorClient{}
mockCadvisor := &cadvisor.Mock{}
cadvisorReq := &cadvisorApi.ContainerInfoRequest{}
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil)
kubelet := Kubelet{
dockerClient: &fakeDocker,
dockerPuller: &dockertools.FakeDockerPuller{},
cadvisorClient: mockCadvisor,
dockerClient: &fakeDocker,
dockerPuller: &dockertools.FakeDockerPuller{},
cadvisor: mockCadvisor,
}
// If the container name is an empty string, then it means the root container.
@ -1265,34 +1243,14 @@ func TestGetRootInfo(t *testing.T) {
mockCadvisor.AssertExpectations(t)
}
func TestGetContainerInfoWithoutCadvisor(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
{
ID: "foobar",
// pod id: qux
// container id: foo
Names: []string{"/k8s_foo_qux_uuid_1234"},
},
}
stats, _ := kubelet.GetContainerInfo("qux", "uuid", "foo", nil)
// When there's no cAdvisor, the stats should be either nil or empty
if stats == nil {
return
}
}
func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
containerID := "ab2cdf"
kubelet, fakeDocker, _, mockCadvisor := newTestKubelet(t)
cadvisorApiFailure := fmt.Errorf("cAdvisor failure")
containerInfo := cadvisorApi.ContainerInfo{}
mockCadvisor := &mockCadvisorClient{}
cadvisorReq := &cadvisorApi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, ErrCadvisorApiFailure)
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, cadvisorApiFailure)
fakeDocker.ContainerList = []docker.APIContainers{
{
ID: containerID,
@ -1310,17 +1268,14 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
t.Errorf("expect error but received nil error")
return
}
if err.Error() != ErrCadvisorApiFailure.Error() {
t.Errorf("wrong error message. expect %v, got %v", ErrCadvisorApiFailure, err)
if err.Error() != cadvisorApiFailure.Error() {
t.Errorf("wrong error message. expect %v, got %v", cadvisorApiFailure, err)
}
mockCadvisor.AssertExpectations(t)
}
func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
kubelet, fakeDocker, _, mockCadvisor := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{}
stats, _ := kubelet.GetContainerInfo("qux", "", "foo", nil)
@ -1331,10 +1286,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
}
func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, _, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
kubelet, _, _, mockCadvisor := newTestKubelet(t)
expectedErr := fmt.Errorf("List containers error")
kubelet.dockerClient = &errorTestingDockerClient{listContainersError: expectedErr}
@ -1348,13 +1300,11 @@ func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) {
if stats != nil {
t.Errorf("non-nil stats when dockertools failed")
}
mockCadvisor.AssertExpectations(t)
}
func TestGetContainerInfoWithNoContainers(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, _, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
kubelet, _, _, mockCadvisor := newTestKubelet(t)
kubelet.dockerClient = &errorTestingDockerClient{listContainersError: nil}
stats, err := kubelet.GetContainerInfo("qux_ns", "", "foo", nil)
@ -1367,13 +1317,11 @@ func TestGetContainerInfoWithNoContainers(t *testing.T) {
if stats != nil {
t.Errorf("non-nil stats when dockertools returned no containers")
}
mockCadvisor.AssertExpectations(t)
}
func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, _, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
kubelet, _, _, mockCadvisor := newTestKubelet(t)
containerList := []docker.APIContainers{
{
@ -1393,6 +1341,7 @@ func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
if stats != nil {
t.Errorf("non-nil stats when dockertools returned no containers")
}
mockCadvisor.AssertExpectations(t)
}
type fakeContainerCommandRunner struct {
@ -1436,7 +1385,7 @@ func (f *fakeContainerCommandRunner) PortForward(podInfraContainerID string, por
func TestRunInContainerNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner
@ -1458,7 +1407,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
func TestRunInContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
containerID := "abc1234"
@ -1498,7 +1447,7 @@ func TestRunInContainer(t *testing.T) {
func TestRunHandlerExec(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
containerID := "abc1234"
@ -1546,7 +1495,7 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) {
func TestRunHandlerHttp(t *testing.T) {
fakeHttp := fakeHTTP{}
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
kubelet.httpClient = &fakeHttp
podName := "podFoo"
@ -1575,7 +1524,7 @@ func TestRunHandlerHttp(t *testing.T) {
}
func TestNewHandler(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
handler := &api.Handler{
HTTPGet: &api.HTTPGetAction{
Host: "foo",
@ -1606,7 +1555,7 @@ func TestNewHandler(t *testing.T) {
}
func TestSyncPodEventHandlerFails(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.httpClient = &fakeHTTP{
err: fmt.Errorf("test error"),
}
@ -1794,7 +1743,7 @@ func TestKubeletGarbageCollection(t *testing.T) {
},
}
for _, test := range tests {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.maxContainerCount = 2
fakeDocker.ContainerList = test.containers
fakeDocker.ContainerMap = test.containerDetails
@ -1959,7 +1908,7 @@ func TestPurgeOldest(t *testing.T) {
},
}
for _, test := range tests {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.maxContainerCount = 5
fakeDocker.ContainerMap = test.containerDetails
kubelet.purgeOldest(test.ids)
@ -1970,7 +1919,7 @@ func TestPurgeOldest(t *testing.T) {
}
func TestSyncPodsWithPullPolicy(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t)
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{"existing_one", "want:latest"}
kubelet.podInfraContainerImage = "custom_image_name"
@ -2303,7 +2252,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
}
for _, tc := range testCases {
kl, _, _ := newTestKubelet(t)
kl, _, _, _ := newTestKubelet(t)
kl.masterServiceNamespace = tc.masterServiceNamespace
if tc.nilLister {
kl.serviceLister = nil
@ -2740,7 +2689,7 @@ func TestGetPodReadyCondition(t *testing.T) {
func TestExecInContainerNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner
@ -2767,7 +2716,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
func TestExecInContainerNoSuchContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
@ -2819,7 +2768,7 @@ func (f *fakeReadWriteCloser) Close() error {
func TestExecInContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
@ -2877,7 +2826,7 @@ func TestExecInContainer(t *testing.T) {
func TestPortForwardNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner
@ -2901,7 +2850,7 @@ func TestPortForwardNoSuchPod(t *testing.T) {
func TestPortForwardNoSuchContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
@ -2935,7 +2884,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) {
func TestPortForward(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker, _, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
@ -3008,7 +2957,7 @@ func TestGetHostPortConflicts(t *testing.T) {
// Tests that we handle port conflicts correctly by setting the failed status in status map.
func TestHandlePortConflicts(t *testing.T) {
kl, _, _ := newTestKubelet(t)
kl, _, _, _ := newTestKubelet(t)
spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}
pods := []api.BoundPod{
{
@ -3059,7 +3008,7 @@ func TestHandlePortConflicts(t *testing.T) {
}
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
kl, _, _ := newTestKubelet(t)
kl, _, _, _ := newTestKubelet(t)
pods := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
@ -3077,7 +3026,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
}
func TestValidatePodStatus(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
testCases := []struct {
podPhase api.PodPhase
success bool
@ -3102,7 +3051,7 @@ func TestValidatePodStatus(t *testing.T) {
}
func TestValidateContainerStatus(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _, _, _ := newTestKubelet(t)
containerName := "x"
testCases := []struct {
podInfo api.PodInfo

View File

@ -17,8 +17,6 @@ limitations under the License.
package kubelet
import (
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
@ -26,23 +24,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
cadvisor "github.com/google/cadvisor/client"
)
// TODO: move this into the kubelet itself
func MonitorCAdvisor(k *Kubelet, cp uint) {
defer util.HandleCrash()
// TODO: Monitor this connection, reconnect if needed?
glog.V(1).Infof("Trying to create cadvisor client.")
cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:" + strconv.Itoa(int(cp)))
if err != nil {
glog.Errorf("Error on creating cadvisor client: %v", err)
return
}
glog.V(1).Infof("Successfully created cadvisor client.")
k.SetCadvisorClient(cadvisorClient)
}
// TODO: move this into a pkg/tools/etcd_tools
func EtcdClientOrDie(etcdServerList util.StringList, etcdConfigFile string) tools.EtcdClient {
if len(etcdServerList) > 0 {