Centralize Capacity discovery of standard resources in Container manager.
Have storage derive node capacity from container manager.
Move certain cAdvisor interfaces to the cAdvisor package in the process.

This patch fixes a bug in container manager where it was writing to a map without synchronization.

Signed-off-by: Vishnu kannan <vishnuk@google.com>
This commit is contained in:
Vishnu kannan 2017-06-26 12:49:00 -07:00 committed by Michelle Au
parent cb1f5605a6
commit 82f7820066
15 changed files with 115 additions and 76 deletions

View File

@ -225,3 +225,16 @@ func (cc *cadvisorClient) getFsInfo(label string) (cadvisorapiv2.FsInfo, error)
func (cc *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) { func (cc *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return cc.WatchForEvents(request) return cc.WatchForEvents(request)
} }
// HasDedicatedImageFs returns true if the imagefs has a dedicated device.
func (cc *cadvisorClient) HasDedicatedImageFs() (bool, error) {
imageFsInfo, err := cc.ImagesFsInfo()
if err != nil {
return false, err
}
rootFsInfo, err := cc.RootFsInfo()
if err != nil {
return false, err
}
return imageFsInfo.Device != rootFsInfo.Device, nil
}

View File

@ -76,3 +76,7 @@ func (cu *cadvisorUnsupported) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
func (cu *cadvisorUnsupported) WatchEvents(request *events.Request) (*events.EventChannel, error) { func (cu *cadvisorUnsupported) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return nil, unsupportedErr return nil, unsupportedErr
} }
func (cu *cadvisorUnsupported) HasDedicatedImageFs() (bool, error) {
return false, unsupportedErr
}

View File

@ -73,3 +73,7 @@ func (cu *cadvisorClient) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
func (cu *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) { func (cu *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return &events.EventChannel{}, nil return &events.EventChannel{}, nil
} }
func (cu *cadvisorClient) HasDedicatedImageFs() (bool, error) {
return false, nil
}

View File

@ -73,3 +73,7 @@ func (c *Fake) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
func (c *Fake) WatchEvents(request *events.Request) (*events.EventChannel, error) { func (c *Fake) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return new(events.EventChannel), nil return new(events.EventChannel), nil
} }
func (c *Fake) HasDedicatedImageFs() (bool, error) {
return false, nil
}

View File

@ -83,3 +83,8 @@ func (c *Mock) WatchEvents(request *events.Request) (*events.EventChannel, error
args := c.Called() args := c.Called()
return args.Get(0).(*events.EventChannel), args.Error(1) return args.Get(0).(*events.EventChannel), args.Error(1)
} }
func (c *Mock) HasDedicatedImageFs() (bool, error) {
args := c.Called()
return args.Get(0).(bool), args.Error(1)
}

View File

@ -41,4 +41,7 @@ type Interface interface {
// Get events streamed through passedChannel that fit the request. // Get events streamed through passedChannel that fit the request.
WatchEvents(request *events.Request) (*events.EventChannel, error) WatchEvents(request *events.Request) (*events.EventChannel, error)
// HasDedicatedImageFs returns true iff a dedicated image filesystem exists for storing images.
HasDedicatedImageFs() (bool, error)
} }

View File

@ -60,6 +60,9 @@ type ContainerManager interface {
// GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling. // GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling.
GetNodeAllocatableReservation() v1.ResourceList GetNodeAllocatableReservation() v1.ResourceList
// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
GetCapacity() v1.ResourceList
// UpdateQOSCgroups performs housekeeping updates to ensure that the top // UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way // level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error UpdateQOSCgroups() error

View File

@ -224,6 +224,25 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
} else { } else {
return nil, err return nil, err
} }
rootfs, err := cadvisorInterface.RootFsInfo()
if err != nil {
capacity[v1.ResourceStorageScratch] = resource.MustParse("0Gi")
} else {
for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) {
capacity[rName] = rCap
}
}
if hasDedicatedImageFs, _ := cadvisorInterface.HasDedicatedImageFs(); hasDedicatedImageFs {
imagesfs, err := cadvisorInterface.ImagesFsInfo()
if err != nil {
glog.Errorf("Failed to get Image filesystem information: %v", err)
} else {
for rName, rCap := range cadvisor.StorageOverlayCapacityFromFsInfo(imagesfs) {
capacity[rName] = rCap
}
}
}
cgroupRoot := nodeConfig.CgroupRoot cgroupRoot := nodeConfig.CgroupRoot
cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver) cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)

View File

@ -58,6 +58,10 @@ func (cm *containerManagerStub) GetNodeAllocatableReservation() v1.ResourceList
return nil return nil
} }
func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return nil
}
func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager { func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{} return &podContainerManagerStub{}
} }

View File

@ -29,7 +29,6 @@ import (
clientv1 "k8s.io/api/core/v1" clientv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
) )
@ -184,17 +183,6 @@ func (cm *containerManagerImpl) getNodeAllocatableAbsolute() v1.ResourceList {
// GetNodeAllocatable returns amount of compute or storage resource that have to be reserved on this node from scheduling. // GetNodeAllocatable returns amount of compute or storage resource that have to be reserved on this node from scheduling.
func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList { func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList {
evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity) evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity)
if _, ok := cm.capacity[v1.ResourceStorage]; !ok {
if cm.cadvisorInterface != nil {
if rootfs, err := cm.cadvisorInterface.RootFsInfo(); err == nil {
for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) {
cm.capacity[rName] = rCap
}
} else {
glog.Warning("Error getting rootfs info: %v", err)
}
}
}
result := make(v1.ResourceList) result := make(v1.ResourceList)
for k := range cm.capacity { for k := range cm.capacity {
value := resource.NewQuantity(0, resource.DecimalSI) value := resource.NewQuantity(0, resource.DecimalSI)

View File

@ -1301,7 +1301,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
glog.Fatalf("Failed to start cAdvisor %v", err) glog.Fatalf("Failed to start cAdvisor %v", err)
} }
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl, kl.GetActivePods, kl.podResourcesAreReclaimed, kl, evictionMonitoringPeriod) kl.evictionManager.Start(kl.cadvisor, kl.GetActivePods, kl.podResourcesAreReclaimed, kl, evictionMonitoringPeriod)
} }
// Run starts the kubelet reacting to config updates // Run starts the kubelet reacting to config updates

View File

@ -45,19 +45,6 @@ func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, contai
return &ci, nil return &ci, nil
} }
// HasDedicatedImageFs returns true if the imagefs has a dedicated device.
func (kl *Kubelet) HasDedicatedImageFs() (bool, error) {
imageFsInfo, err := kl.ImagesFsInfo()
if err != nil {
return false, err
}
rootFsInfo, err := kl.RootFsInfo()
if err != nil {
return false, err
}
return imageFsInfo.Device != rootFsInfo.Device, nil
}
// GetContainerInfoV2 returns stats (from Cadvisor) for containers. // GetContainerInfoV2 returns stats (from Cadvisor) for containers.
func (kl *Kubelet) GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) { func (kl *Kubelet) GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
return kl.cadvisor.ContainerInfoV2(name, options) return kl.cadvisor.ContainerInfoV2(name, options)

View File

@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
@ -251,34 +250,3 @@ func TestGetRawContainerInfoSubcontainers(t *testing.T) {
assert.Len(t, result, 2) assert.Len(t, result, 2)
mockCadvisor.AssertExpectations(t) mockCadvisor.AssertExpectations(t)
} }
func TestHasDedicatedImageFs(t *testing.T) {
testCases := map[string]struct {
imageFsInfo cadvisorapiv2.FsInfo
rootFsInfo cadvisorapiv2.FsInfo
expected bool
}{
"has-dedicated-image-fs": {
imageFsInfo: cadvisorapiv2.FsInfo{Device: "123"},
rootFsInfo: cadvisorapiv2.FsInfo{Device: "456"},
expected: true,
},
"has-unified-image-fs": {
imageFsInfo: cadvisorapiv2.FsInfo{Device: "123"},
rootFsInfo: cadvisorapiv2.FsInfo{Device: "123"},
expected: false,
},
}
for testName, testCase := range testCases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("ImagesFsInfo").Return(testCase.imageFsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(testCase.rootFsInfo, nil)
actual, err := kubelet.HasDedicatedImageFs()
assert.NoError(t, err, "test [%s]", testName)
assert.Equal(t, testCase.expected, actual, "test [%s]", testName)
}
}

View File

@ -549,6 +549,7 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity( node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(
int64(kl.maxPods), resource.DecimalSI) int64(kl.maxPods), resource.DecimalSI)
} }
if node.Status.NodeInfo.BootID != "" && if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID { node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated // TODO: This requires a transaction, either both node status is updated
@ -557,25 +558,16 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
"Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID) "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
} }
node.Status.NodeInfo.BootID = info.BootID node.Status.NodeInfo.BootID = info.BootID
}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
rootfs, err := kl.GetCachedRootFsInfo() // TODO: all the node resources should use GetCapacity instead of deriving the
if err != nil { // capacity for every node status request
node.Status.Capacity[v1.ResourceStorageScratch] = resource.MustParse("0Gi") initialCapacity := kl.containerManager.GetCapacity()
} else { if initialCapacity != nil {
for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) { node.Status.Capacity[v1.ResourceStorageScratch] = initialCapacity[v1.ResourceStorageScratch]
node.Status.Capacity[rName] = rCap imageCapacity, ok := initialCapacity[v1.ResourceStorageOverlay]
} if ok {
} node.Status.Capacity[v1.ResourceStorageOverlay] = imageCapacity
if hasDedicatedImageFs, _ := kl.HasDedicatedImageFs(); hasDedicatedImageFs {
imagesfs, err := kl.ImagesFsInfo()
if err != nil {
node.Status.Capacity[v1.ResourceStorageOverlay] = resource.MustParse("0Gi")
} else {
for rName, rCap := range cadvisor.StorageOverlayCapacityFromFsInfo(imagesfs) {
node.Status.Capacity[rName] = rCap
} }
} }
} }

View File

@ -115,12 +115,17 @@ func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error)
type localCM struct { type localCM struct {
cm.ContainerManager cm.ContainerManager
allocatable v1.ResourceList allocatable v1.ResourceList
capacity v1.ResourceList
} }
func (lcm *localCM) GetNodeAllocatableReservation() v1.ResourceList { func (lcm *localCM) GetNodeAllocatableReservation() v1.ResourceList {
return lcm.allocatable return lcm.allocatable
} }
func (lcm *localCM) GetCapacity() v1.ResourceList {
return lcm.capacity
}
func TestUpdateNewNodeStatus(t *testing.T) { func TestUpdateNewNodeStatus(t *testing.T) {
// generate one more than maxImagesInNodeStatus in inputImageList // generate one more than maxImagesInNodeStatus in inputImageList
inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1) inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1)
@ -134,6 +139,10 @@ func TestUpdateNewNodeStatus(t *testing.T) {
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
}, },
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
},
} }
kubeClient := testKubelet.fakeKubeClient kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
@ -251,6 +260,18 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
kubeClient := testKubelet.fakeKubeClient kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
@ -319,6 +340,10 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
}, },
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
} }
kubeClient := testKubelet.fakeKubeClient kubeClient := testKubelet.fakeKubeClient
@ -503,6 +528,18 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
clock := testKubelet.fakeClock clock := testKubelet.fakeClock
// Do not set nano second, because apiserver function doesn't support nano second. (Only support // Do not set nano second, because apiserver function doesn't support nano second. (Only support
// RFC3339). // RFC3339).
@ -658,6 +695,10 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
}, },
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
},
} }
clock := testKubelet.fakeClock clock := testKubelet.fakeClock
@ -1113,6 +1154,10 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
allocatable: v1.ResourceList{ allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(40000, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(40000, resource.DecimalSI),
}, },
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
},
} }
kubeClient := testKubelet.fakeKubeClient kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}