Update nodeinfomanager to store volume limits in CSINode

This commit is contained in:
Fabio Bertinatto 2019-05-22 10:08:14 +02:00
parent b90ca5b2a7
commit 33c8bacd41
4 changed files with 176 additions and 65 deletions

View File

@ -13,7 +13,6 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
@ -62,5 +61,6 @@ go_test(
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -22,14 +22,14 @@ import (
"encoding/json"
goerrors "errors"
"fmt"
"math"
"strings"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -117,17 +117,13 @@ func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID str
nodeUpdateFuncs = append(nodeUpdateFuncs, updateTopologyLabels(topology))
}
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
nodeUpdateFuncs = append(nodeUpdateFuncs, updateMaxAttachLimit(driverName, maxAttachLimit))
}
err := nim.updateNode(nodeUpdateFuncs...)
if err != nil {
return fmt.Errorf("error updating Node object with CSI driver node info: %v", err)
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
err = nim.updateCSINode(driverName, driverNodeID, topology)
err = nim.updateCSINode(driverName, driverNodeID, maxAttachLimit, topology)
if err != nil {
return fmt.Errorf("error updating CSINode object with CSI driver node info: %v", err)
}
@ -354,6 +350,7 @@ func updateTopologyLabels(topology map[string]string) nodeUpdateFunc {
func (nim *nodeInfoManager) updateCSINode(
driverName string,
driverNodeID string,
maxAttachLimit int64,
topology map[string]string) error {
csiKubeClient := nim.volumeHost.GetKubeClient()
@ -363,7 +360,7 @@ func (nim *nodeInfoManager) updateCSINode(
var updateErrs []error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, topology); err != nil {
if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, maxAttachLimit, topology); err != nil {
updateErrs = append(updateErrs, err)
return false, nil
}
@ -379,6 +376,7 @@ func (nim *nodeInfoManager) tryUpdateCSINode(
csiKubeClient clientset.Interface,
driverName string,
driverNodeID string,
maxAttachLimit int64,
topology map[string]string) error {
nodeInfo, err := csiKubeClient.StorageV1beta1().CSINodes().Get(string(nim.nodeName), metav1.GetOptions{})
@ -389,7 +387,7 @@ func (nim *nodeInfoManager) tryUpdateCSINode(
return err
}
return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology)
return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, maxAttachLimit, topology)
}
func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
@ -515,6 +513,7 @@ func (nim *nodeInfoManager) installDriverToCSINode(
nodeInfo *storagev1beta1.CSINode,
driverName string,
driverNodeID string,
maxAttachLimit int64,
topology map[string]string) error {
csiKubeClient := nim.volumeHost.GetKubeClient()
@ -555,6 +554,19 @@ func (nim *nodeInfoManager) installDriverToCSINode(
TopologyKeys: topologyKeys.List(),
}
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
if maxAttachLimit > 0 {
if maxAttachLimit > math.MaxInt32 {
klog.Warningf("Exceeded max supported attach limit value, truncating it to %d", math.MaxInt32)
maxAttachLimit = math.MaxInt32
}
m := int32(maxAttachLimit)
driverSpec.Allocatable = &storagev1beta1.VolumeNodeResources{Count: &m}
} else {
klog.Errorf("Invalid attach limit value %d cannot be added to CSINode object for %q", maxAttachLimit, driverName)
}
}
newDriverSpecs = append(newDriverSpecs, driverSpec)
nodeInfo.Spec.Drivers = newDriverSpecs
@ -621,27 +633,6 @@ func (nim *nodeInfoManager) tryUninstallDriverFromCSINode(
}
func updateMaxAttachLimit(driverName string, maxLimit int64) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
if maxLimit <= 0 {
klog.V(4).Infof("skipping adding attach limit for %s", driverName)
return node, false, nil
}
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
limitKeyName := util.GetCSIAttachLimitKey(driverName)
node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
return node, true, nil
}
}
func removeMaxAttachLimit(driverName string) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
limitKey := v1.ResourceName(util.GetCSIAttachLimitKey(driverName))

View File

@ -19,12 +19,14 @@ package nodeinfomanager
import (
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"math"
"reflect"
"testing"
"k8s.io/apimachinery/pkg/runtime"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@ -40,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/features"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
utilpointer "k8s.io/utils/pointer"
)
type testcase struct {
@ -107,6 +110,7 @@ func TestInstallCSIDriver(t *testing.T) {
nodeIDMap{
"com.example.csi.driver1": "com.example.csi/csi-node1",
},
nil, /* volumeLimits */
topologyKeyMap{
"com.example.csi.driver1": {"com.example.csi/zone"},
},
@ -130,6 +134,7 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: []string{"com.example.csi/zone"},
Allocatable: nil,
},
},
},
@ -147,6 +152,7 @@ func TestInstallCSIDriver(t *testing.T) {
nodeIDMap{
"com.example.csi.driver1": "com.example.csi/csi-node1",
},
nil, /* volumeLimits */
nil, /* topologyKeys */
),
inputNodeID: "com.example.csi/csi-node1",
@ -168,6 +174,7 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: []string{"com.example.csi/zone"},
Allocatable: nil,
},
},
},
@ -187,6 +194,7 @@ func TestInstallCSIDriver(t *testing.T) {
nodeIDMap{
"net.example.storage.other-driver": "net.example.storage/test-node",
},
nil, /* volumeLimits */
topologyKeyMap{
"net.example.storage.other-driver": {"net.example.storage/rack"},
},
@ -216,11 +224,13 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "net.example.storage.other-driver",
NodeID: "net.example.storage/test-node",
TopologyKeys: []string{"net.example.storage/rack"},
Allocatable: nil,
},
{
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: []string{"com.example.csi/zone"},
Allocatable: nil,
},
},
},
@ -240,6 +250,7 @@ func TestInstallCSIDriver(t *testing.T) {
nodeIDMap{
"com.example.csi.driver1": "com.example.csi/csi-node1",
},
nil, /* volumeLimits */
topologyKeyMap{
"com.example.csi.driver1": {"com.example.csi/zone"},
},
@ -264,6 +275,7 @@ func TestInstallCSIDriver(t *testing.T) {
nodeIDMap{
"com.example.csi.driver1": "com.example.csi/csi-node1",
},
nil, /* volumeLimits */
topologyKeyMap{
"com.example.csi.driver1": {"com.example.csi/zone"},
},
@ -290,6 +302,7 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/other-node",
TopologyKeys: []string{"com.example.csi/rack"},
Allocatable: nil,
},
},
},
@ -315,6 +328,7 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: nil,
Allocatable: nil,
},
},
},
@ -334,6 +348,7 @@ func TestInstallCSIDriver(t *testing.T) {
nodeIDMap{
"com.example.csi.driver1": "com.example.csi/csi-node1",
},
nil, /* volumeLimits */
topologyKeyMap{
"com.example.csi.driver1": {"com.example.csi/zone"},
},
@ -357,6 +372,7 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: nil,
Allocatable: nil,
},
},
},
@ -376,6 +392,7 @@ func TestInstallCSIDriver(t *testing.T) {
nodeIDMap{
"net.example.storage.other-driver": "net.example.storage/test-node",
},
nil, /* volumeLimits */
topologyKeyMap{
"net.example.storage.other-driver": {"net.example.storage/rack"},
},
@ -402,11 +419,13 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "net.example.storage.other-driver",
NodeID: "net.example.storage/test-node",
TopologyKeys: []string{"net.example.storage/rack"},
Allocatable: nil,
},
{
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: nil,
Allocatable: nil,
},
},
},
@ -420,7 +439,7 @@ func TestInstallCSIDriver(t *testing.T) {
expectFail: true,
},
{
name: "new node with valid max limit",
name: "new node with valid max limit of volumes",
driverName: "com.example.csi.driver1",
existingNode: generateNode(nil /*nodeIDs*/, nil /*labels*/, nil /*capacity*/),
inputVolumeLimit: 10,
@ -431,15 +450,94 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "node1",
Annotations: map[string]string{annotationKeyNodeID: marshall(nodeIDMap{"com.example.csi.driver1": "com.example.csi/csi-node1"})},
},
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi.driver1")): *resource.NewQuantity(10, resource.DecimalSI),
},
expectedCSINode: &storage.CSINode{
ObjectMeta: getCSINodeObjectMeta(),
Spec: storage.CSINodeSpec{
Drivers: []storage.CSINodeDriver{
{
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: nil,
Allocatable: &storage.VolumeNodeResources{
Count: utilpointer.Int32Ptr(10),
},
},
},
Allocatable: v1.ResourceList{
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi.driver1")): *resource.NewQuantity(10, resource.DecimalSI),
},
},
},
{
name: "new node with max limit of volumes",
driverName: "com.example.csi.driver1",
existingNode: generateNode(nil /*nodeIDs*/, nil /*labels*/, nil /*capacity*/),
inputVolumeLimit: math.MaxInt32,
inputTopology: nil,
inputNodeID: "com.example.csi/csi-node1",
expectedNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{annotationKeyNodeID: marshall(nodeIDMap{"com.example.csi.driver1": "com.example.csi/csi-node1"})},
},
},
expectedCSINode: &storage.CSINode{
ObjectMeta: getCSINodeObjectMeta(),
Spec: storage.CSINodeSpec{
Drivers: []storage.CSINodeDriver{
{
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: nil,
Allocatable: &storage.VolumeNodeResources{
Count: utilpointer.Int32Ptr(math.MaxInt32),
},
},
},
},
},
},
{
name: "new node with overflown max limit of volumes",
driverName: "com.example.csi.driver1",
existingNode: generateNode(nil /*nodeIDs*/, nil /*labels*/, nil /*capacity*/),
inputVolumeLimit: math.MaxInt32 + 1,
inputTopology: nil,
inputNodeID: "com.example.csi/csi-node1",
expectedNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{annotationKeyNodeID: marshall(nodeIDMap{"com.example.csi.driver1": "com.example.csi/csi-node1"})},
},
},
expectedCSINode: &storage.CSINode{
ObjectMeta: getCSINodeObjectMeta(),
Spec: storage.CSINodeSpec{
Drivers: []storage.CSINodeDriver{
{
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: nil,
Allocatable: &storage.VolumeNodeResources{
Count: utilpointer.Int32Ptr(math.MaxInt32),
},
},
},
},
},
},
{
name: "new node without max limit of volumes",
driverName: "com.example.csi.driver1",
existingNode: generateNode(nil /*nodeIDs*/, nil /*labels*/, nil /*capacity*/),
inputVolumeLimit: 0,
inputTopology: nil,
inputNodeID: "com.example.csi/csi-node1",
expectedNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{annotationKeyNodeID: marshall(nodeIDMap{"com.example.csi.driver1": "com.example.csi/csi-node1"})},
},
},
expectedCSINode: &storage.CSINode{
ObjectMeta: getCSINodeObjectMeta(),
Spec: storage.CSINodeSpec{
@ -454,15 +552,23 @@ func TestInstallCSIDriver(t *testing.T) {
},
},
{
name: "node with existing valid max limit",
name: "node with existing valid max limit of volumes",
driverName: "com.example.csi.driver1",
existingNode: generateNode(
nil, /*nodeIDs*/
nil, /*labels*/
map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: *resource.NewScaledQuantity(4, -3),
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi/driver1")): *resource.NewQuantity(10, resource.DecimalSI),
}),
existingCSINode: generateCSINode(
nodeIDMap{
"com.example.csi.driver1": "com.example.csi/csi-node1",
},
generateVolumeLimits(10),
nil, /* topologyKeys */
),
inputVolumeLimit: 20,
inputTopology: nil,
inputNodeID: "com.example.csi/csi-node1",
@ -473,14 +579,10 @@ func TestInstallCSIDriver(t *testing.T) {
},
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi.driver1")): *resource.NewQuantity(20, resource.DecimalSI),
v1.ResourceCPU: *resource.NewScaledQuantity(4, -3),
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi/driver1")): *resource.NewQuantity(10, resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi.driver1")): *resource.NewQuantity(20, resource.DecimalSI),
v1.ResourceCPU: *resource.NewScaledQuantity(4, -3),
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi/driver1")): *resource.NewQuantity(10, resource.DecimalSI),
},
},
},
@ -492,6 +594,7 @@ func TestInstallCSIDriver(t *testing.T) {
Name: "com.example.csi.driver1",
NodeID: "com.example.csi/csi-node1",
TopologyKeys: nil,
Allocatable: generateVolumeLimits(10),
},
},
},
@ -502,6 +605,12 @@ func TestInstallCSIDriver(t *testing.T) {
test(t, true /* addNodeInfo */, true /* csiNodeInfoEnabled */, testcases)
}
func generateVolumeLimits(i int32) *storage.VolumeNodeResources {
return &storage.VolumeNodeResources{
Count: utilpointer.Int32Ptr(i),
}
}
// TestInstallCSIDriver_CSINodeInfoDisabled tests InstallCSIDriver with various existing Node annotations
// and CSINodeInfo feature gate disabled.
func TestInstallCSIDriverCSINodeInfoDisabled(t *testing.T) {
@ -589,6 +698,7 @@ func TestUninstallCSIDriver(t *testing.T) {
nodeIDMap{
"com.example.csi.driver1": "com.example.csi/csi-node1",
},
nil, /* volumeLimits */
topologyKeyMap{
"com.example.csi.driver1": {"com.example.csi/zone"},
},
@ -619,6 +729,7 @@ func TestUninstallCSIDriver(t *testing.T) {
nodeIDMap{
"net.example.storage.other-driver": "net.example.storage/csi-node1",
},
nil, /* volumeLimits */
topologyKeyMap{
"net.example.storage.other-driver": {"net.example.storage/zone"},
},
@ -1116,12 +1227,13 @@ func marshall(nodeIDs nodeIDMap) string {
return string(b)
}
func generateCSINode(nodeIDs nodeIDMap, topologyKeys topologyKeyMap) *storage.CSINode {
func generateCSINode(nodeIDs nodeIDMap, volumeLimits *storage.VolumeNodeResources, topologyKeys topologyKeyMap) *storage.CSINode {
nodeDrivers := []storage.CSINodeDriver{}
for k, nodeID := range nodeIDs {
dspec := storage.CSINodeDriver{
Name: k,
NodeID: nodeID,
Name: k,
NodeID: nodeID,
Allocatable: volumeLimits,
}
if top, exists := topologyKeys[k]; exists {
dspec.TopologyKeys = top

View File

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -33,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -357,12 +357,12 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
init(testParameters{nodeSelectorKey: nodeSelectorKey, attachLimit: 2})
defer cleanup()
nodeName := m.config.ClientNodeName
attachKey := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(m.provisioner))
driverName := m.config.GetUniqueDriverName()
nodeAttachLimit, err := checkNodeForLimits(nodeName, attachKey, m.cs)
framework.ExpectNoError(err, "while fetching node %v", err)
csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs)
framework.ExpectNoError(err, "while checking limits in CSINode: %v", err)
gomega.Expect(nodeAttachLimit).To(gomega.Equal(2))
gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 2))
_, _, pod1 := createPod()
gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating first pod")
@ -576,25 +576,21 @@ func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
return waitErr
}
func checkNodeForLimits(nodeName string, attachKey v1.ResourceName, cs clientset.Interface) (int, error) {
var attachLimit int64
func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Interface) (int32, error) {
var attachLimit int32
waitErr := wait.PollImmediate(10*time.Second, csiNodeLimitUpdateTimeout, func() (bool, error) {
node, err := cs.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
csiNode, err := cs.StorageV1beta1().CSINodes().Get(nodeName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return false, err
}
limits := getVolumeLimit(node)
var ok bool
if len(limits) > 0 {
attachLimit, ok = limits[attachKey]
if ok {
return true, nil
}
attachLimit = getVolumeLimitFromCSINode(csiNode, driverName)
if attachLimit > 0 {
return true, nil
}
return false, nil
})
return int(attachLimit), waitErr
return attachLimit, waitErr
}
func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node framework.NodeSelection, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
@ -805,3 +801,15 @@ func getVolumeHandle(cs clientset.Interface, claim *v1.PersistentVolumeClaim) st
}
return pv.Spec.CSI.VolumeHandle
}
func getVolumeLimitFromCSINode(csiNode *storagev1beta1.CSINode, driverName string) int32 {
for _, d := range csiNode.Spec.Drivers {
if d.Name != driverName {
continue
}
if d.Allocatable != nil && d.Allocatable.Count != nil {
return *d.Allocatable.Count
}
}
return 0
}