Merge pull request #48123 from msau42/fix-allocatable-race

Automatic merge from submit-queue (batch tested with PRs 48123, 48079)

[Kubelet] Fix race condition in container manager

**What this PR does / why we need it**:

This fixes a race condition where the container manager capacity map was being updated without synchronization.  It moves the storage capacity detection to kubelet initialization, which happens serially in one thread.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #48045

**Release note**:

```release-note
Fixes kubelet race condition in container manager.
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-28 02:00:07 -07:00 committed by GitHub
commit 13a7fdc83f
15 changed files with 115 additions and 76 deletions

View File

@ -225,3 +225,16 @@ func (cc *cadvisorClient) getFsInfo(label string) (cadvisorapiv2.FsInfo, error)
func (cc *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return cc.WatchForEvents(request)
}
// HasDedicatedImageFs returns true if the imagefs has a dedicated device.
func (cc *cadvisorClient) HasDedicatedImageFs() (bool, error) {
imageFsInfo, err := cc.ImagesFsInfo()
if err != nil {
return false, err
}
rootFsInfo, err := cc.RootFsInfo()
if err != nil {
return false, err
}
return imageFsInfo.Device != rootFsInfo.Device, nil
}

View File

@ -76,3 +76,7 @@ func (cu *cadvisorUnsupported) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
func (cu *cadvisorUnsupported) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return nil, unsupportedErr
}
func (cu *cadvisorUnsupported) HasDedicatedImageFs() (bool, error) {
return false, unsupportedErr
}

View File

@ -73,3 +73,7 @@ func (cu *cadvisorClient) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
func (cu *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return &events.EventChannel{}, nil
}
func (cu *cadvisorClient) HasDedicatedImageFs() (bool, error) {
return false, nil
}

View File

@ -73,3 +73,7 @@ func (c *Fake) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
func (c *Fake) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return new(events.EventChannel), nil
}
func (c *Fake) HasDedicatedImageFs() (bool, error) {
return false, nil
}

View File

@ -83,3 +83,8 @@ func (c *Mock) WatchEvents(request *events.Request) (*events.EventChannel, error
args := c.Called()
return args.Get(0).(*events.EventChannel), args.Error(1)
}
func (c *Mock) HasDedicatedImageFs() (bool, error) {
args := c.Called()
return args.Get(0).(bool), args.Error(1)
}

View File

@ -41,4 +41,7 @@ type Interface interface {
// Get events streamed through passedChannel that fit the request.
WatchEvents(request *events.Request) (*events.EventChannel, error)
// HasDedicatedImageFs returns true iff a dedicated image filesystem exists for storing images.
HasDedicatedImageFs() (bool, error)
}

View File

@ -60,6 +60,9 @@ type ContainerManager interface {
// GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling.
GetNodeAllocatableReservation() v1.ResourceList
// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
GetCapacity() v1.ResourceList
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error

View File

@ -224,6 +224,25 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
} else {
return nil, err
}
rootfs, err := cadvisorInterface.RootFsInfo()
if err != nil {
capacity[v1.ResourceStorageScratch] = resource.MustParse("0Gi")
} else {
for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) {
capacity[rName] = rCap
}
}
if hasDedicatedImageFs, _ := cadvisorInterface.HasDedicatedImageFs(); hasDedicatedImageFs {
imagesfs, err := cadvisorInterface.ImagesFsInfo()
if err != nil {
glog.Errorf("Failed to get Image filesystem information: %v", err)
} else {
for rName, rCap := range cadvisor.StorageOverlayCapacityFromFsInfo(imagesfs) {
capacity[rName] = rCap
}
}
}
cgroupRoot := nodeConfig.CgroupRoot
cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)

View File

@ -58,6 +58,10 @@ func (cm *containerManagerStub) GetNodeAllocatableReservation() v1.ResourceList
return nil
}
func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return nil
}
func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{}
}

View File

@ -29,7 +29,6 @@ import (
clientv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
)
@ -184,17 +183,6 @@ func (cm *containerManagerImpl) getNodeAllocatableAbsolute() v1.ResourceList {
// GetNodeAllocatable returns amount of compute or storage resource that have to be reserved on this node from scheduling.
func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList {
evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity)
if _, ok := cm.capacity[v1.ResourceStorage]; !ok {
if cm.cadvisorInterface != nil {
if rootfs, err := cm.cadvisorInterface.RootFsInfo(); err == nil {
for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) {
cm.capacity[rName] = rCap
}
} else {
glog.Warning("Error getting rootfs info: %v", err)
}
}
}
result := make(v1.ResourceList)
for k := range cm.capacity {
value := resource.NewQuantity(0, resource.DecimalSI)

View File

@ -1301,7 +1301,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
glog.Fatalf("Failed to start cAdvisor %v", err)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl, kl.GetActivePods, kl.podResourcesAreReclaimed, kl, evictionMonitoringPeriod)
kl.evictionManager.Start(kl.cadvisor, kl.GetActivePods, kl.podResourcesAreReclaimed, kl, evictionMonitoringPeriod)
}
// Run starts the kubelet reacting to config updates

View File

@ -45,19 +45,6 @@ func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, contai
return &ci, nil
}
// HasDedicatedImageFs returns true if the imagefs has a dedicated device.
func (kl *Kubelet) HasDedicatedImageFs() (bool, error) {
imageFsInfo, err := kl.ImagesFsInfo()
if err != nil {
return false, err
}
rootFsInfo, err := kl.RootFsInfo()
if err != nil {
return false, err
}
return imageFsInfo.Device != rootFsInfo.Device, nil
}
// GetContainerInfoV2 returns stats (from Cadvisor) for containers.
func (kl *Kubelet) GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
return kl.cadvisor.ContainerInfoV2(name, options)

View File

@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
@ -251,34 +250,3 @@ func TestGetRawContainerInfoSubcontainers(t *testing.T) {
assert.Len(t, result, 2)
mockCadvisor.AssertExpectations(t)
}
func TestHasDedicatedImageFs(t *testing.T) {
testCases := map[string]struct {
imageFsInfo cadvisorapiv2.FsInfo
rootFsInfo cadvisorapiv2.FsInfo
expected bool
}{
"has-dedicated-image-fs": {
imageFsInfo: cadvisorapiv2.FsInfo{Device: "123"},
rootFsInfo: cadvisorapiv2.FsInfo{Device: "456"},
expected: true,
},
"has-unified-image-fs": {
imageFsInfo: cadvisorapiv2.FsInfo{Device: "123"},
rootFsInfo: cadvisorapiv2.FsInfo{Device: "123"},
expected: false,
},
}
for testName, testCase := range testCases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("ImagesFsInfo").Return(testCase.imageFsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(testCase.rootFsInfo, nil)
actual, err := kubelet.HasDedicatedImageFs()
assert.NoError(t, err, "test [%s]", testName)
assert.Equal(t, testCase.expected, actual, "test [%s]", testName)
}
}

View File

@ -549,6 +549,7 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(
int64(kl.maxPods), resource.DecimalSI)
}
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
@ -557,25 +558,16 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
"Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
rootfs, err := kl.GetCachedRootFsInfo()
if err != nil {
node.Status.Capacity[v1.ResourceStorageScratch] = resource.MustParse("0Gi")
} else {
for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) {
node.Status.Capacity[rName] = rCap
}
}
if hasDedicatedImageFs, _ := kl.HasDedicatedImageFs(); hasDedicatedImageFs {
imagesfs, err := kl.ImagesFsInfo()
if err != nil {
node.Status.Capacity[v1.ResourceStorageOverlay] = resource.MustParse("0Gi")
} else {
for rName, rCap := range cadvisor.StorageOverlayCapacityFromFsInfo(imagesfs) {
node.Status.Capacity[rName] = rCap
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
// TODO: all the node resources should use GetCapacity instead of deriving the
// capacity for every node status request
initialCapacity := kl.containerManager.GetCapacity()
if initialCapacity != nil {
node.Status.Capacity[v1.ResourceStorageScratch] = initialCapacity[v1.ResourceStorageScratch]
imageCapacity, ok := initialCapacity[v1.ResourceStorageOverlay]
if ok {
node.Status.Capacity[v1.ResourceStorageOverlay] = imageCapacity
}
}
}

View File

@ -115,12 +115,17 @@ func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error)
type localCM struct {
cm.ContainerManager
allocatable v1.ResourceList
capacity v1.ResourceList
}
func (lcm *localCM) GetNodeAllocatableReservation() v1.ResourceList {
return lcm.allocatable
}
func (lcm *localCM) GetCapacity() v1.ResourceList {
return lcm.capacity
}
func TestUpdateNewNodeStatus(t *testing.T) {
// generate one more than maxImagesInNodeStatus in inputImageList
inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1)
@ -134,6 +139,10 @@ func TestUpdateNewNodeStatus(t *testing.T) {
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
},
}
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
@ -251,6 +260,18 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
@ -319,6 +340,10 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
kubeClient := testKubelet.fakeKubeClient
@ -503,6 +528,18 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
clock := testKubelet.fakeClock
// Do not set nano second, because apiserver function doesn't support nano second. (Only support
// RFC3339).
@ -658,6 +695,10 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
},
}
clock := testKubelet.fakeClock
@ -1113,6 +1154,10 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(40000, resource.DecimalSI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
},
}
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}