From 1f9404dfc0da20ee74529258e55f9a57f81218c3 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 23 May 2018 12:51:29 -0400 Subject: [PATCH] Implement kubelet side changes for writing volume limit to node Add tests for checking node limits --- pkg/kubelet/BUILD | 3 ++ pkg/kubelet/kubelet_node_status.go | 27 ++++++++++ pkg/kubelet/kubelet_node_status_test.go | 69 ++++++++++++++++++++++++- pkg/kubelet/kubelet_test.go | 35 +++++++++++-- 4 files changed, 129 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index e63c5dc958f..ef4c3a5dafe 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -198,6 +198,9 @@ go_test( "//pkg/util/mount:go_default_library", "//pkg/version:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/aws_ebs:go_default_library", + "//pkg/volume/azure_dd:go_default_library", + "//pkg/volume/gce_pd:go_default_library", "//pkg/volume/host_path:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 47400d3baf5..0eda88fbed8 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -326,6 +326,30 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { return node, nil } +// setVolumeLimits updates volume limits on the node +func (kl *Kubelet) setVolumeLimits(node *v1.Node) { + if node.Status.Capacity == nil { + node.Status.Capacity = v1.ResourceList{} + } + + if node.Status.Allocatable == nil { + node.Status.Allocatable = v1.ResourceList{} + } + + pluginWithLimits := kl.volumePluginMgr.ListVolumePluginWithLimits() + for _, volumePlugin := range pluginWithLimits { + attachLimits, err := volumePlugin.GetVolumeLimits() + if err != nil { + glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName()) + continue + } + for limitKey, value := range attachLimits { + node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) + node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) + } + } +} + // syncNodeStatus should be called periodically from a goroutine. // It synchronizes node status to master, registering the kubelet first if // necessary. @@ -751,6 +775,9 @@ func (kl *Kubelet) setNodeStatusInfo(node *v1.Node) { kl.setNodeStatusDaemonEndpoints(node) kl.setNodeStatusImages(node) kl.setNodeStatusGoRuntime(node) + if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + kl.setVolumeLimits(node) + } } // Set Ready condition for the node. diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 034560f14a3..d7115764284 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -331,7 +331,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { } inputImageList, expectedImageList := generateTestingImageLists(numTestImages, int(tc.nodeStatusMaxImages)) testKubelet := newTestKubeletWithImageList( - t, inputImageList, false /* controllerAttachDetachEnabled */) + t, inputImageList, false /* controllerAttachDetachEnabled */, true /*initFakeVolumePlugin*/) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = tc.nodeStatusMaxImages @@ -1252,7 +1252,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { // generate one more in inputImageList than we configure the Kubelet to report inputImageList, _ := generateTestingImageLists(nodeStatusMaxImages+1, nodeStatusMaxImages) testKubelet := newTestKubeletWithImageList( - t, inputImageList, false /* controllerAttachDetachEnabled */) + t, inputImageList, false /* controllerAttachDetachEnabled */, true /* initFakeVolumePlugin */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = nodeStatusMaxImages @@ -1616,3 +1616,68 @@ func TestValidateNodeIPParam(t *testing.T) { } } } + +func TestSetVolumeLimits(t *testing.T) { + testKubelet := newTestKubeletWithoutFakeVolumePlugin(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used + kubelet.hostname = testKubeletHostname + + var testcases = []struct { + name string + cloudProviderName string + expectedVolumeKey string + expectedLimit int64 + }{ + { + name: "For default GCE cloudprovider", + cloudProviderName: "gce", + expectedVolumeKey: util.GCEVolumeLimitKey, + expectedLimit: 16, + }, + { + name: "For default AWS Cloudprovider", + cloudProviderName: "aws", + expectedVolumeKey: util.EBSVolumeLimitKey, + expectedLimit: 39, + }, + { + name: "for default Azure cloudprovider", + cloudProviderName: "azure", + expectedVolumeKey: util.AzureVolumeLimitKey, + expectedLimit: 16, + }, + } + for _, test := range testcases { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)}, + Spec: v1.NodeSpec{}, + } + + fakeCloud := &fakecloud.FakeCloud{ + Provider: test.cloudProviderName, + Err: nil, + } + kubelet.cloud = fakeCloud + kubelet.cloudproviderRequestParallelism = make(chan int, 1) + kubelet.cloudproviderRequestSync = make(chan int) + kubelet.cloudproviderRequestTimeout = 10 * time.Second + kubelet.setVolumeLimits(node) + nodeLimits := []v1.ResourceList{} + nodeLimits = append(nodeLimits, node.Status.Allocatable) + nodeLimits = append(nodeLimits, node.Status.Capacity) + for _, volumeLimits := range nodeLimits { + fl, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)] + if !ok { + t.Errorf("Expected to found volume limit for %s found none", test.expectedVolumeKey) + } + foundLimit, _ := fl.AsInt64() + expectedValue := resource.NewQuantity(test.expectedLimit, resource.DecimalSI) + if expectedValue.Cmp(fl) != 0 { + t.Errorf("Expected volume limit for %s to be %v found %v", test.expectedVolumeKey, test.expectedLimit, foundLimit) + } + } + + } +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 33b870c0b3c..3876f245152 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -67,6 +67,9 @@ import ( schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/aws_ebs" + "k8s.io/kubernetes/pkg/volume/azure_dd" + "k8s.io/kubernetes/pkg/volume/gce_pd" _ "k8s.io/kubernetes/pkg/volume/host_path" volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" @@ -133,13 +136,30 @@ func newTestKubelet(t *testing.T, controllerAttachDetachEnabled bool) *TestKubel Size: 456, }, } - return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled) + return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled, true /*initFakeVolumePlugin*/) +} + +func newTestKubeletWithoutFakeVolumePlugin(t *testing.T, controllerAttachDetachEnabled bool) *TestKubelet { + imageList := []kubecontainer.Image{ + { + ID: "abc", + RepoTags: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"}, + Size: 123, + }, + { + ID: "efg", + RepoTags: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"}, + Size: 456, + }, + } + return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled, false /*initFakeVolumePlugin*/) } func newTestKubeletWithImageList( t *testing.T, imageList []kubecontainer.Image, - controllerAttachDetachEnabled bool) *TestKubelet { + controllerAttachDetachEnabled bool, + initFakeVolumePlugin bool) *TestKubelet { fakeRuntime := &containertest.FakeRuntime{} fakeRuntime.RuntimeType = "test" fakeRuntime.VersionInfo = "1.5.0" @@ -293,10 +313,19 @@ func newTestKubeletWithImageList( // Add this as cleanup predicate pod admitter kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources)) + allPlugins := []volume.VolumePlugin{} plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} + if initFakeVolumePlugin { + allPlugins = append(allPlugins, plug) + } else { + allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...) + } + var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock kubelet.volumePluginMgr, err = - NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, []volume.VolumePlugin{plug}, prober) + NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, allPlugins, prober) require.NoError(t, err, "Failed to initialize VolumePluginMgr") kubelet.mounter = &mount.FakeMounter{}