From 82f782006604f3eec8c59204df5c54178037f38a Mon Sep 17 00:00:00 2001 From: Vishnu kannan Date: Mon, 26 Jun 2017 12:49:00 -0700 Subject: [PATCH] Kubelet: Centralize Capacity discovery of standard resources in Container manager. Have storage derive node capacity from container manager. Move certain cAdvisor interfaces to the cAdvisor package in the process. This patch fixes a bug in container manager where it was writing to a map without synchronization. Signed-off-by: Vishnu kannan --- pkg/kubelet/cadvisor/cadvisor_linux.go | 13 ++++++ pkg/kubelet/cadvisor/cadvisor_unsupported.go | 4 ++ pkg/kubelet/cadvisor/cadvisor_windows.go | 4 ++ pkg/kubelet/cadvisor/testing/cadvisor_fake.go | 4 ++ pkg/kubelet/cadvisor/testing/cadvisor_mock.go | 5 +++ pkg/kubelet/cadvisor/types.go | 3 ++ pkg/kubelet/cm/container_manager.go | 3 ++ pkg/kubelet/cm/container_manager_linux.go | 19 ++++++++ pkg/kubelet/cm/container_manager_stub.go | 4 ++ pkg/kubelet/cm/node_container_manager.go | 12 ----- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_cadvisor.go | 13 ------ pkg/kubelet/kubelet_cadvisor_test.go | 32 ------------- pkg/kubelet/kubelet_node_status.go | 28 +++++------- pkg/kubelet/kubelet_node_status_test.go | 45 +++++++++++++++++++ 15 files changed, 115 insertions(+), 76 deletions(-) diff --git a/pkg/kubelet/cadvisor/cadvisor_linux.go b/pkg/kubelet/cadvisor/cadvisor_linux.go index f0392063e09..cdbe96a27e4 100644 --- a/pkg/kubelet/cadvisor/cadvisor_linux.go +++ b/pkg/kubelet/cadvisor/cadvisor_linux.go @@ -225,3 +225,16 @@ func (cc *cadvisorClient) getFsInfo(label string) (cadvisorapiv2.FsInfo, error) func (cc *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) { return cc.WatchForEvents(request) } + +// HasDedicatedImageFs returns true if the imagefs has a dedicated device. +func (cc *cadvisorClient) HasDedicatedImageFs() (bool, error) { + imageFsInfo, err := cc.ImagesFsInfo() + if err != nil { + return false, err + } + rootFsInfo, err := cc.RootFsInfo() + if err != nil { + return false, err + } + return imageFsInfo.Device != rootFsInfo.Device, nil +} diff --git a/pkg/kubelet/cadvisor/cadvisor_unsupported.go b/pkg/kubelet/cadvisor/cadvisor_unsupported.go index 23378242775..788629c3392 100644 --- a/pkg/kubelet/cadvisor/cadvisor_unsupported.go +++ b/pkg/kubelet/cadvisor/cadvisor_unsupported.go @@ -76,3 +76,7 @@ func (cu *cadvisorUnsupported) RootFsInfo() (cadvisorapiv2.FsInfo, error) { func (cu *cadvisorUnsupported) WatchEvents(request *events.Request) (*events.EventChannel, error) { return nil, unsupportedErr } + +func (cu *cadvisorUnsupported) HasDedicatedImageFs() (bool, error) { + return false, unsupportedErr +} diff --git a/pkg/kubelet/cadvisor/cadvisor_windows.go b/pkg/kubelet/cadvisor/cadvisor_windows.go index 832208658fc..f7e265ec678 100644 --- a/pkg/kubelet/cadvisor/cadvisor_windows.go +++ b/pkg/kubelet/cadvisor/cadvisor_windows.go @@ -73,3 +73,7 @@ func (cu *cadvisorClient) RootFsInfo() (cadvisorapiv2.FsInfo, error) { func (cu *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) { return &events.EventChannel{}, nil } + +func (cu *cadvisorClient) HasDedicatedImageFs() (bool, error) { + return false, nil +} diff --git a/pkg/kubelet/cadvisor/testing/cadvisor_fake.go b/pkg/kubelet/cadvisor/testing/cadvisor_fake.go index 894c6f0c976..64566712504 100644 --- a/pkg/kubelet/cadvisor/testing/cadvisor_fake.go +++ b/pkg/kubelet/cadvisor/testing/cadvisor_fake.go @@ -73,3 +73,7 @@ func (c *Fake) RootFsInfo() (cadvisorapiv2.FsInfo, error) { func (c *Fake) WatchEvents(request *events.Request) (*events.EventChannel, error) { return new(events.EventChannel), nil } + +func (c *Fake) HasDedicatedImageFs() (bool, error) { + return false, nil +} diff --git a/pkg/kubelet/cadvisor/testing/cadvisor_mock.go b/pkg/kubelet/cadvisor/testing/cadvisor_mock.go index a6ba72c8784..7848039178a 100644 --- a/pkg/kubelet/cadvisor/testing/cadvisor_mock.go +++ b/pkg/kubelet/cadvisor/testing/cadvisor_mock.go @@ -83,3 +83,8 @@ func (c *Mock) WatchEvents(request *events.Request) (*events.EventChannel, error args := c.Called() return args.Get(0).(*events.EventChannel), args.Error(1) } + +func (c *Mock) HasDedicatedImageFs() (bool, error) { + args := c.Called() + return args.Get(0).(bool), args.Error(1) +} diff --git a/pkg/kubelet/cadvisor/types.go b/pkg/kubelet/cadvisor/types.go index de7d334c1b7..2a97ba35237 100644 --- a/pkg/kubelet/cadvisor/types.go +++ b/pkg/kubelet/cadvisor/types.go @@ -41,4 +41,7 @@ type Interface interface { // Get events streamed through passedChannel that fit the request. WatchEvents(request *events.Request) (*events.EventChannel, error) + + // HasDedicatedImageFs returns true iff a dedicated image filesystem exists for storing images. + HasDedicatedImageFs() (bool, error) } diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 467a97ca045..77e5dd187e4 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -60,6 +60,9 @@ type ContainerManager interface { // GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling. GetNodeAllocatableReservation() v1.ResourceList + // GetCapacity returns the amount of compute resources tracked by container manager available on the node. + GetCapacity() v1.ResourceList + // UpdateQOSCgroups performs housekeeping updates to ensure that the top // level QoS containers have their desired state in a thread-safe way UpdateQOSCgroups() error diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 8a144c4c03a..c652feb9ae4 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -224,6 +224,25 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I } else { return nil, err } + rootfs, err := cadvisorInterface.RootFsInfo() + if err != nil { + capacity[v1.ResourceStorageScratch] = resource.MustParse("0Gi") + } else { + for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) { + capacity[rName] = rCap + } + } + + if hasDedicatedImageFs, _ := cadvisorInterface.HasDedicatedImageFs(); hasDedicatedImageFs { + imagesfs, err := cadvisorInterface.ImagesFsInfo() + if err != nil { + glog.Errorf("Failed to get Image filesystem information: %v", err) + } else { + for rName, rCap := range cadvisor.StorageOverlayCapacityFromFsInfo(imagesfs) { + capacity[rName] = rCap + } + } + } cgroupRoot := nodeConfig.CgroupRoot cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver) diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index caa9c611d5a..941913aceee 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -58,6 +58,10 @@ func (cm *containerManagerStub) GetNodeAllocatableReservation() v1.ResourceList return nil } +func (cm *containerManagerStub) GetCapacity() v1.ResourceList { + return nil +} + func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager { return &podContainerManagerStub{} } diff --git a/pkg/kubelet/cm/node_container_manager.go b/pkg/kubelet/cm/node_container_manager.go index 11f321ed8c3..bb6359f2d4e 100644 --- a/pkg/kubelet/cm/node_container_manager.go +++ b/pkg/kubelet/cm/node_container_manager.go @@ -29,7 +29,6 @@ import ( clientv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" - "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" ) @@ -184,17 +183,6 @@ func (cm *containerManagerImpl) getNodeAllocatableAbsolute() v1.ResourceList { // GetNodeAllocatable returns amount of compute or storage resource that have to be reserved on this node from scheduling. func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList { evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity) - if _, ok := cm.capacity[v1.ResourceStorage]; !ok { - if cm.cadvisorInterface != nil { - if rootfs, err := cm.cadvisorInterface.RootFsInfo(); err == nil { - for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) { - cm.capacity[rName] = rCap - } - } else { - glog.Warning("Error getting rootfs info: %v", err) - } - } - } result := make(v1.ResourceList) for k := range cm.capacity { value := resource.NewQuantity(0, resource.DecimalSI) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c76cb7532cd..bdbf2aaddaf 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1301,7 +1301,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { glog.Fatalf("Failed to start cAdvisor %v", err) } // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs - kl.evictionManager.Start(kl, kl.GetActivePods, kl.podResourcesAreReclaimed, kl, evictionMonitoringPeriod) + kl.evictionManager.Start(kl.cadvisor, kl.GetActivePods, kl.podResourcesAreReclaimed, kl, evictionMonitoringPeriod) } // Run starts the kubelet reacting to config updates diff --git a/pkg/kubelet/kubelet_cadvisor.go b/pkg/kubelet/kubelet_cadvisor.go index dcb3de9728e..779095f5675 100644 --- a/pkg/kubelet/kubelet_cadvisor.go +++ b/pkg/kubelet/kubelet_cadvisor.go @@ -45,19 +45,6 @@ func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, contai return &ci, nil } -// HasDedicatedImageFs returns true if the imagefs has a dedicated device. -func (kl *Kubelet) HasDedicatedImageFs() (bool, error) { - imageFsInfo, err := kl.ImagesFsInfo() - if err != nil { - return false, err - } - rootFsInfo, err := kl.RootFsInfo() - if err != nil { - return false, err - } - return imageFsInfo.Device != rootFsInfo.Device, nil -} - // GetContainerInfoV2 returns stats (from Cadvisor) for containers. func (kl *Kubelet) GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) { return kl.cadvisor.ContainerInfoV2(name, options) diff --git a/pkg/kubelet/kubelet_cadvisor_test.go b/pkg/kubelet/kubelet_cadvisor_test.go index bfbdc9e8812..aa269f68f9d 100644 --- a/pkg/kubelet/kubelet_cadvisor_test.go +++ b/pkg/kubelet/kubelet_cadvisor_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/require" cadvisorapi "github.com/google/cadvisor/info/v1" - cadvisorapiv2 "github.com/google/cadvisor/info/v2" "k8s.io/apimachinery/pkg/types" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing" @@ -251,34 +250,3 @@ func TestGetRawContainerInfoSubcontainers(t *testing.T) { assert.Len(t, result, 2) mockCadvisor.AssertExpectations(t) } - -func TestHasDedicatedImageFs(t *testing.T) { - testCases := map[string]struct { - imageFsInfo cadvisorapiv2.FsInfo - rootFsInfo cadvisorapiv2.FsInfo - expected bool - }{ - "has-dedicated-image-fs": { - imageFsInfo: cadvisorapiv2.FsInfo{Device: "123"}, - rootFsInfo: cadvisorapiv2.FsInfo{Device: "456"}, - expected: true, - }, - "has-unified-image-fs": { - imageFsInfo: cadvisorapiv2.FsInfo{Device: "123"}, - rootFsInfo: cadvisorapiv2.FsInfo{Device: "123"}, - expected: false, - }, - } - for testName, testCase := range testCases { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - defer testKubelet.Cleanup() - kubelet := testKubelet.kubelet - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("Start").Return(nil) - mockCadvisor.On("ImagesFsInfo").Return(testCase.imageFsInfo, nil) - mockCadvisor.On("RootFsInfo").Return(testCase.rootFsInfo, nil) - actual, err := kubelet.HasDedicatedImageFs() - assert.NoError(t, err, "test [%s]", testName) - assert.Equal(t, testCase.expected, actual, "test [%s]", testName) - } -} diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 09e4230122f..493ac03eaba 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -549,6 +549,7 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity( int64(kl.maxPods), resource.DecimalSI) } + if node.Status.NodeInfo.BootID != "" && node.Status.NodeInfo.BootID != info.BootID { // TODO: This requires a transaction, either both node status is updated @@ -557,25 +558,16 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID) } node.Status.NodeInfo.BootID = info.BootID - } - if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { - rootfs, err := kl.GetCachedRootFsInfo() - if err != nil { - node.Status.Capacity[v1.ResourceStorageScratch] = resource.MustParse("0Gi") - } else { - for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) { - node.Status.Capacity[rName] = rCap - } - } - - if hasDedicatedImageFs, _ := kl.HasDedicatedImageFs(); hasDedicatedImageFs { - imagesfs, err := kl.ImagesFsInfo() - if err != nil { - node.Status.Capacity[v1.ResourceStorageOverlay] = resource.MustParse("0Gi") - } else { - for rName, rCap := range cadvisor.StorageOverlayCapacityFromFsInfo(imagesfs) { - node.Status.Capacity[rName] = rCap + if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { + // TODO: all the node resources should use GetCapacity instead of deriving the + // capacity for every node status request + initialCapacity := kl.containerManager.GetCapacity() + if initialCapacity != nil { + node.Status.Capacity[v1.ResourceStorageScratch] = initialCapacity[v1.ResourceStorageScratch] + imageCapacity, ok := initialCapacity[v1.ResourceStorageOverlay] + if ok { + node.Status.Capacity[v1.ResourceStorageOverlay] = imageCapacity } } } diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 69f4278b161..6337e8bd117 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -115,12 +115,17 @@ func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error) type localCM struct { cm.ContainerManager allocatable v1.ResourceList + capacity v1.ResourceList } func (lcm *localCM) GetNodeAllocatableReservation() v1.ResourceList { return lcm.allocatable } +func (lcm *localCM) GetCapacity() v1.ResourceList { + return lcm.capacity +} + func TestUpdateNewNodeStatus(t *testing.T) { // generate one more than maxImagesInNodeStatus in inputImageList inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1) @@ -134,6 +139,10 @@ func TestUpdateNewNodeStatus(t *testing.T) { v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + }, } kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} @@ -251,6 +260,18 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.containerManager = &localCM{ + ContainerManager: cm.NewStubContainerManager(), + allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), + }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + }, + } + kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain @@ -319,6 +340,10 @@ func TestUpdateExistingNodeStatus(t *testing.T) { v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + }, } kubeClient := testKubelet.fakeKubeClient @@ -503,6 +528,18 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.containerManager = &localCM{ + ContainerManager: cm.NewStubContainerManager(), + allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), + }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + }, + } + clock := testKubelet.fakeClock // Do not set nano second, because apiserver function doesn't support nano second. (Only support // RFC3339). @@ -658,6 +695,10 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + }, } clock := testKubelet.fakeClock @@ -1113,6 +1154,10 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(40000, resource.DecimalSI), }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + }, } kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}