Implement kubelet side changes for writing volume limit to node

Add tests for checking node limits
This commit is contained in:
Hemant Kumar 2018-05-23 12:51:29 -04:00
parent cf282203c3
commit 1f9404dfc0
4 changed files with 129 additions and 5 deletions

View File

@ -198,6 +198,9 @@ go_test(
"//pkg/util/mount:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/aws_ebs:go_default_library",
"//pkg/volume/azure_dd:go_default_library",
"//pkg/volume/gce_pd:go_default_library",
"//pkg/volume/host_path:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",

View File

@ -326,6 +326,30 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
return node, nil
}
// setVolumeLimits updates volume limits on the node
func (kl *Kubelet) setVolumeLimits(node *v1.Node) {
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
pluginWithLimits := kl.volumePluginMgr.ListVolumePluginWithLimits()
for _, volumePlugin := range pluginWithLimits {
attachLimits, err := volumePlugin.GetVolumeLimits()
if err != nil {
glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName())
continue
}
for limitKey, value := range attachLimits {
node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
}
}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
@ -751,6 +775,9 @@ func (kl *Kubelet) setNodeStatusInfo(node *v1.Node) {
kl.setNodeStatusDaemonEndpoints(node)
kl.setNodeStatusImages(node)
kl.setNodeStatusGoRuntime(node)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
kl.setVolumeLimits(node)
}
}
// Set Ready condition for the node.

View File

@ -331,7 +331,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
inputImageList, expectedImageList := generateTestingImageLists(numTestImages, int(tc.nodeStatusMaxImages))
testKubelet := newTestKubeletWithImageList(
t, inputImageList, false /* controllerAttachDetachEnabled */)
t, inputImageList, false /* controllerAttachDetachEnabled */, true /*initFakeVolumePlugin*/)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = tc.nodeStatusMaxImages
@ -1252,7 +1252,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
// generate one more in inputImageList than we configure the Kubelet to report
inputImageList, _ := generateTestingImageLists(nodeStatusMaxImages+1, nodeStatusMaxImages)
testKubelet := newTestKubeletWithImageList(
t, inputImageList, false /* controllerAttachDetachEnabled */)
t, inputImageList, false /* controllerAttachDetachEnabled */, true /* initFakeVolumePlugin */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = nodeStatusMaxImages
@ -1616,3 +1616,68 @@ func TestValidateNodeIPParam(t *testing.T) {
}
}
}
func TestSetVolumeLimits(t *testing.T) {
testKubelet := newTestKubeletWithoutFakeVolumePlugin(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.hostname = testKubeletHostname
var testcases = []struct {
name string
cloudProviderName string
expectedVolumeKey string
expectedLimit int64
}{
{
name: "For default GCE cloudprovider",
cloudProviderName: "gce",
expectedVolumeKey: util.GCEVolumeLimitKey,
expectedLimit: 16,
},
{
name: "For default AWS Cloudprovider",
cloudProviderName: "aws",
expectedVolumeKey: util.EBSVolumeLimitKey,
expectedLimit: 39,
},
{
name: "for default Azure cloudprovider",
cloudProviderName: "azure",
expectedVolumeKey: util.AzureVolumeLimitKey,
expectedLimit: 16,
},
}
for _, test := range testcases {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)},
Spec: v1.NodeSpec{},
}
fakeCloud := &fakecloud.FakeCloud{
Provider: test.cloudProviderName,
Err: nil,
}
kubelet.cloud = fakeCloud
kubelet.cloudproviderRequestParallelism = make(chan int, 1)
kubelet.cloudproviderRequestSync = make(chan int)
kubelet.cloudproviderRequestTimeout = 10 * time.Second
kubelet.setVolumeLimits(node)
nodeLimits := []v1.ResourceList{}
nodeLimits = append(nodeLimits, node.Status.Allocatable)
nodeLimits = append(nodeLimits, node.Status.Capacity)
for _, volumeLimits := range nodeLimits {
fl, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)]
if !ok {
t.Errorf("Expected to found volume limit for %s found none", test.expectedVolumeKey)
}
foundLimit, _ := fl.AsInt64()
expectedValue := resource.NewQuantity(test.expectedLimit, resource.DecimalSI)
if expectedValue.Cmp(fl) != 0 {
t.Errorf("Expected volume limit for %s to be %v found %v", test.expectedVolumeKey, test.expectedLimit, foundLimit)
}
}
}
}

View File

@ -67,6 +67,9 @@ import (
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/aws_ebs"
"k8s.io/kubernetes/pkg/volume/azure_dd"
"k8s.io/kubernetes/pkg/volume/gce_pd"
_ "k8s.io/kubernetes/pkg/volume/host_path"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
@ -133,13 +136,30 @@ func newTestKubelet(t *testing.T, controllerAttachDetachEnabled bool) *TestKubel
Size: 456,
},
}
return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled)
return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled, true /*initFakeVolumePlugin*/)
}
func newTestKubeletWithoutFakeVolumePlugin(t *testing.T, controllerAttachDetachEnabled bool) *TestKubelet {
imageList := []kubecontainer.Image{
{
ID: "abc",
RepoTags: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"},
Size: 123,
},
{
ID: "efg",
RepoTags: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"},
Size: 456,
},
}
return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled, false /*initFakeVolumePlugin*/)
}
func newTestKubeletWithImageList(
t *testing.T,
imageList []kubecontainer.Image,
controllerAttachDetachEnabled bool) *TestKubelet {
controllerAttachDetachEnabled bool,
initFakeVolumePlugin bool) *TestKubelet {
fakeRuntime := &containertest.FakeRuntime{}
fakeRuntime.RuntimeType = "test"
fakeRuntime.VersionInfo = "1.5.0"
@ -293,10 +313,19 @@ func newTestKubeletWithImageList(
// Add this as cleanup predicate pod admitter
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources))
allPlugins := []volume.VolumePlugin{}
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
if initFakeVolumePlugin {
allPlugins = append(allPlugins, plug)
} else {
allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...)
}
var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
kubelet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, []volume.VolumePlugin{plug}, prober)
NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, allPlugins, prober)
require.NoError(t, err, "Failed to initialize VolumePluginMgr")
kubelet.mounter = &mount.FakeMounter{}