From 017082654296f7575aff65a6b1e05172adf7ad62 Mon Sep 17 00:00:00 2001 From: Michael Taufen Date: Fri, 29 Jun 2018 15:48:51 -0700 Subject: [PATCH] port setVolumeLimits to Setter abstraction, add test --- pkg/kubelet/BUILD | 1 - pkg/kubelet/kubelet_node_status.go | 26 +------- pkg/kubelet/kubelet_node_status_test.go | 83 ------------------------- pkg/kubelet/nodestatus/BUILD | 3 + pkg/kubelet/nodestatus/setters.go | 28 +++++++++ pkg/kubelet/nodestatus/setters_test.go | 65 +++++++++++++++++++ 6 files changed, 97 insertions(+), 109 deletions(-) diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 0c4e7a8189f..c5474872841 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -164,7 +164,6 @@ go_test( deps = [ "//pkg/apis/core/install:go_default_library", "//pkg/capabilities:go_default_library", - "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/cadvisor/testing:go_default_library", diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index d8fa79bb9a3..600def8d694 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -329,30 +329,6 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { return node, nil } -// setVolumeLimits updates volume limits on the node -func (kl *Kubelet) setVolumeLimits(node *v1.Node) { - if node.Status.Capacity == nil { - node.Status.Capacity = v1.ResourceList{} - } - - if node.Status.Allocatable == nil { - node.Status.Allocatable = v1.ResourceList{} - } - - pluginWithLimits := kl.volumePluginMgr.ListVolumePluginWithLimits() - for _, volumePlugin := range pluginWithLimits { - attachLimits, err := volumePlugin.GetVolumeLimits() - if err != nil { - glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName()) - continue - } - for limitKey, value := range attachLimits { - node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) - node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) - } - } -} - // syncNodeStatus should be called periodically from a goroutine. // It synchronizes node status to master, registering the kubelet first if // necessary. @@ -506,7 +482,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { nodestatus.GoRuntime(), ) if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { - setters = append(setters, withoutError(kl.setVolumeLimits)) + setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits)) } setters = append(setters, nodestatus.OutOfDiskCondition(kl.clock.Now, kl.recordNodeStatusEvent), diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 6085f043d40..4d0d049240a 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -46,7 +46,6 @@ import ( v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" core "k8s.io/client-go/testing" - fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" @@ -1475,85 +1474,3 @@ func TestValidateNodeIPParam(t *testing.T) { } } } - -func TestSetVolumeLimits(t *testing.T) { - testKubelet := newTestKubeletWithoutFakeVolumePlugin(t, false /* controllerAttachDetachEnabled */) - defer testKubelet.Cleanup() - kubelet := testKubelet.kubelet - kubelet.kubeClient = nil // ensure only the heartbeat client is used - kubelet.hostname = testKubeletHostname - - var testcases = []struct { - name string - cloudProviderName string - expectedVolumeKey string - expectedLimit int64 - }{ - { - name: "For default GCE cloudprovider", - cloudProviderName: "gce", - expectedVolumeKey: util.GCEVolumeLimitKey, - expectedLimit: 16, - }, - { - name: "For default AWS Cloudprovider", - cloudProviderName: "aws", - expectedVolumeKey: util.EBSVolumeLimitKey, - expectedLimit: 39, - }, - { - name: "for default Azure cloudprovider", - cloudProviderName: "azure", - expectedVolumeKey: util.AzureVolumeLimitKey, - expectedLimit: 16, - }, - { - name: "when no cloudprovider is present", - cloudProviderName: "", - expectedVolumeKey: util.AzureVolumeLimitKey, - expectedLimit: -1, - }, - } - for _, test := range testcases { - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)}, - Spec: v1.NodeSpec{}, - } - - if test.cloudProviderName != "" { - fakeCloud := &fakecloud.FakeCloud{ - Provider: test.cloudProviderName, - Err: nil, - } - kubelet.cloud = fakeCloud - } else { - kubelet.cloud = nil - } - - kubelet.setVolumeLimits(node) - nodeLimits := []v1.ResourceList{} - nodeLimits = append(nodeLimits, node.Status.Allocatable) - nodeLimits = append(nodeLimits, node.Status.Capacity) - for _, volumeLimits := range nodeLimits { - if test.expectedLimit == -1 { - _, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)] - if ok { - t.Errorf("Expected no volume limit found for %s", test.expectedVolumeKey) - } - } else { - fl, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)] - - if !ok { - t.Errorf("Expected to found volume limit for %s found none", test.expectedVolumeKey) - } - foundLimit, _ := fl.AsInt64() - expectedValue := resource.NewQuantity(test.expectedLimit, resource.DecimalSI) - if expectedValue.Cmp(fl) != 0 { - t.Errorf("Expected volume limit for %s to be %v found %v", test.expectedVolumeKey, test.expectedLimit, foundLimit) - } - } - - } - - } -} diff --git a/pkg/kubelet/nodestatus/BUILD b/pkg/kubelet/nodestatus/BUILD index 84930d6c2b2..d8c5f632d8e 100644 --- a/pkg/kubelet/nodestatus/BUILD +++ b/pkg/kubelet/nodestatus/BUILD @@ -15,6 +15,7 @@ go_library( "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/version:go_default_library", + "//pkg/volume:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -51,6 +52,8 @@ go_test( "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/util/sliceutils:go_default_library", "//pkg/version:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/kubelet/nodestatus/setters.go b/pkg/kubelet/nodestatus/setters.go index 83788a64d3c..abe6970ba24 100644 --- a/pkg/kubelet/nodestatus/setters.go +++ b/pkg/kubelet/nodestatus/setters.go @@ -40,6 +40,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/version" + "k8s.io/kubernetes/pkg/volume" "github.com/golang/glog" ) @@ -709,3 +710,30 @@ func VolumesInUse(syncedFunc func() bool, // typically Kubelet.volumeManager.Rec return nil } } + +// VolumeLimits returns a Setter that updates the volume limits on the node. +func VolumeLimits(volumePluginListFunc func() []volume.VolumePluginWithAttachLimits, // typically Kubelet.volumePluginMgr.ListVolumePluginWithLimits +) Setter { + return func(node *v1.Node) error { + if node.Status.Capacity == nil { + node.Status.Capacity = v1.ResourceList{} + } + if node.Status.Allocatable == nil { + node.Status.Allocatable = v1.ResourceList{} + } + + pluginWithLimits := volumePluginListFunc() + for _, volumePlugin := range pluginWithLimits { + attachLimits, err := volumePlugin.GetVolumeLimits() + if err != nil { + glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName()) + continue + } + for limitKey, value := range attachLimits { + node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) + node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) + } + } + return nil + } +} diff --git a/pkg/kubelet/nodestatus/setters_test.go b/pkg/kubelet/nodestatus/setters_test.go index c07330cfb7c..2952d2db098 100644 --- a/pkg/kubelet/nodestatus/setters_test.go +++ b/pkg/kubelet/nodestatus/setters_test.go @@ -40,6 +40,8 @@ import ( "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/version" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1371,6 +1373,69 @@ func TestVolumesInUse(t *testing.T) { } } +func TestVolumeLimits(t *testing.T) { + const ( + volumeLimitKey = "attachable-volumes-fake-provider" + volumeLimitVal = 16 + ) + + var cases = []struct { + desc string + volumePluginList []volume.VolumePluginWithAttachLimits + expectNode *v1.Node + }{ + { + desc: "translate limits to capacity and allocatable for plugins that return successfully from GetVolumeLimits", + volumePluginList: []volume.VolumePluginWithAttachLimits{ + &volumetest.FakeVolumePlugin{ + VolumeLimits: map[string]int64{volumeLimitKey: volumeLimitVal}, + }, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + volumeLimitKey: *resource.NewQuantity(volumeLimitVal, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + volumeLimitKey: *resource.NewQuantity(volumeLimitVal, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "skip plugins that return errors from GetVolumeLimits", + volumePluginList: []volume.VolumePluginWithAttachLimits{ + &volumetest.FakeVolumePlugin{ + VolumeLimitsError: fmt.Errorf("foo"), + }, + }, + expectNode: &v1.Node{}, + }, + { + desc: "no plugins", + expectNode: &v1.Node{}, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + volumePluginListFunc := func() []volume.VolumePluginWithAttachLimits { + return tc.volumePluginList + } + // construct setter + setter := VolumeLimits(volumePluginListFunc) + // call setter on node + node := &v1.Node{} + if err := setter(node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected node + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectNode, node), + "Diff: %s", diff.ObjectDiff(tc.expectNode, node)) + }) + } +} + // Test Helpers: // sortableNodeAddress is a type for sorting []v1.NodeAddress