port setVolumeLimits to Setter abstraction, add test

This commit is contained in:
Michael Taufen 2018-06-29 15:48:51 -07:00
parent 36f7bc76bf
commit 0170826542
6 changed files with 97 additions and 109 deletions

View File

@ -164,7 +164,6 @@ go_test(
deps = [
"//pkg/apis/core/install:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/cadvisor/testing:go_default_library",

View File

@ -329,30 +329,6 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
return node, nil
}
// setVolumeLimits updates volume limits on the node
func (kl *Kubelet) setVolumeLimits(node *v1.Node) {
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
pluginWithLimits := kl.volumePluginMgr.ListVolumePluginWithLimits()
for _, volumePlugin := range pluginWithLimits {
attachLimits, err := volumePlugin.GetVolumeLimits()
if err != nil {
glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName())
continue
}
for limitKey, value := range attachLimits {
node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
}
}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
@ -506,7 +482,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
nodestatus.GoRuntime(),
)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
setters = append(setters, withoutError(kl.setVolumeLimits))
setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
}
setters = append(setters,
nodestatus.OutOfDiskCondition(kl.clock.Now, kl.recordNodeStatusEvent),

View File

@ -46,7 +46,6 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
@ -1475,85 +1474,3 @@ func TestValidateNodeIPParam(t *testing.T) {
}
}
}
func TestSetVolumeLimits(t *testing.T) {
testKubelet := newTestKubeletWithoutFakeVolumePlugin(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.hostname = testKubeletHostname
var testcases = []struct {
name string
cloudProviderName string
expectedVolumeKey string
expectedLimit int64
}{
{
name: "For default GCE cloudprovider",
cloudProviderName: "gce",
expectedVolumeKey: util.GCEVolumeLimitKey,
expectedLimit: 16,
},
{
name: "For default AWS Cloudprovider",
cloudProviderName: "aws",
expectedVolumeKey: util.EBSVolumeLimitKey,
expectedLimit: 39,
},
{
name: "for default Azure cloudprovider",
cloudProviderName: "azure",
expectedVolumeKey: util.AzureVolumeLimitKey,
expectedLimit: 16,
},
{
name: "when no cloudprovider is present",
cloudProviderName: "",
expectedVolumeKey: util.AzureVolumeLimitKey,
expectedLimit: -1,
},
}
for _, test := range testcases {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)},
Spec: v1.NodeSpec{},
}
if test.cloudProviderName != "" {
fakeCloud := &fakecloud.FakeCloud{
Provider: test.cloudProviderName,
Err: nil,
}
kubelet.cloud = fakeCloud
} else {
kubelet.cloud = nil
}
kubelet.setVolumeLimits(node)
nodeLimits := []v1.ResourceList{}
nodeLimits = append(nodeLimits, node.Status.Allocatable)
nodeLimits = append(nodeLimits, node.Status.Capacity)
for _, volumeLimits := range nodeLimits {
if test.expectedLimit == -1 {
_, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)]
if ok {
t.Errorf("Expected no volume limit found for %s", test.expectedVolumeKey)
}
} else {
fl, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)]
if !ok {
t.Errorf("Expected to found volume limit for %s found none", test.expectedVolumeKey)
}
foundLimit, _ := fl.AsInt64()
expectedValue := resource.NewQuantity(test.expectedLimit, resource.DecimalSI)
if expectedValue.Cmp(fl) != 0 {
t.Errorf("Expected volume limit for %s to be %v found %v", test.expectedVolumeKey, test.expectedLimit, foundLimit)
}
}
}
}
}

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -51,6 +52,8 @@ go_test(
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -40,6 +40,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
"github.com/golang/glog"
)
@ -709,3 +710,30 @@ func VolumesInUse(syncedFunc func() bool, // typically Kubelet.volumeManager.Rec
return nil
}
}
// VolumeLimits returns a Setter that updates the volume limits on the node.
func VolumeLimits(volumePluginListFunc func() []volume.VolumePluginWithAttachLimits, // typically Kubelet.volumePluginMgr.ListVolumePluginWithLimits
) Setter {
return func(node *v1.Node) error {
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
pluginWithLimits := volumePluginListFunc()
for _, volumePlugin := range pluginWithLimits {
attachLimits, err := volumePlugin.GetVolumeLimits()
if err != nil {
glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName())
continue
}
for limitKey, value := range attachLimits {
node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
}
}
return nil
}
}

View File

@ -40,6 +40,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -1371,6 +1373,69 @@ func TestVolumesInUse(t *testing.T) {
}
}
func TestVolumeLimits(t *testing.T) {
const (
volumeLimitKey = "attachable-volumes-fake-provider"
volumeLimitVal = 16
)
var cases = []struct {
desc string
volumePluginList []volume.VolumePluginWithAttachLimits
expectNode *v1.Node
}{
{
desc: "translate limits to capacity and allocatable for plugins that return successfully from GetVolumeLimits",
volumePluginList: []volume.VolumePluginWithAttachLimits{
&volumetest.FakeVolumePlugin{
VolumeLimits: map[string]int64{volumeLimitKey: volumeLimitVal},
},
},
expectNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
volumeLimitKey: *resource.NewQuantity(volumeLimitVal, resource.DecimalSI),
},
Allocatable: v1.ResourceList{
volumeLimitKey: *resource.NewQuantity(volumeLimitVal, resource.DecimalSI),
},
},
},
},
{
desc: "skip plugins that return errors from GetVolumeLimits",
volumePluginList: []volume.VolumePluginWithAttachLimits{
&volumetest.FakeVolumePlugin{
VolumeLimitsError: fmt.Errorf("foo"),
},
},
expectNode: &v1.Node{},
},
{
desc: "no plugins",
expectNode: &v1.Node{},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
volumePluginListFunc := func() []volume.VolumePluginWithAttachLimits {
return tc.volumePluginList
}
// construct setter
setter := VolumeLimits(volumePluginListFunc)
// call setter on node
node := &v1.Node{}
if err := setter(node); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// check expected node
assert.True(t, apiequality.Semantic.DeepEqual(tc.expectNode, node),
"Diff: %s", diff.ObjectDiff(tc.expectNode, node))
})
}
}
// Test Helpers:
// sortableNodeAddress is a type for sorting []v1.NodeAddress