From c5d9af2bda5a628b1a4123b4bfff6029312c32cc Mon Sep 17 00:00:00 2001 From: Fabio Bertinatto Date: Wed, 17 Jul 2019 09:57:03 +0200 Subject: [PATCH] Update predicates to use cached CSINode in scheduler --- .../predicates/csi_volume_predicate.go | 48 ++++++++++++++---- .../predicates/csi_volume_predicate_test.go | 20 +++++--- .../max_attachable_volume_predicate_test.go | 50 ++++++++++--------- .../algorithm/predicates/predicates.go | 29 +++++++++-- .../algorithm/predicates/testing_helper.go | 12 ++++- .../defaults/register_predicates.go | 10 ++-- 6 files changed, 118 insertions(+), 51 deletions(-) diff --git a/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go index a93339c2dd6..405eb96741f 100644 --- a/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go +++ b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go @@ -32,17 +32,19 @@ import ( // CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes type CSIMaxVolumeLimitChecker struct { - pvInfo PersistentVolumeInfo - pvcInfo PersistentVolumeClaimInfo - scInfo StorageClassInfo + csiNodeInfo CSINodeInfo + pvInfo PersistentVolumeInfo + pvcInfo PersistentVolumeClaimInfo + scInfo StorageClassInfo randomVolumeIDPrefix string } // NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes func NewCSIMaxVolumeLimitPredicate( - pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, scInfo StorageClassInfo) FitPredicate { + csiNodeInfo CSINodeInfo, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, scInfo StorageClassInfo) FitPredicate { c := &CSIMaxVolumeLimitChecker{ + csiNodeInfo: csiNodeInfo, pvInfo: pvInfo, pvcInfo: pvcInfo, scInfo: scInfo, @@ -51,6 +53,22 @@ func NewCSIMaxVolumeLimitPredicate( return c.attachableLimitPredicate } +func (c *CSIMaxVolumeLimitChecker) getVolumeLimits(nodeInfo *schedulernodeinfo.NodeInfo, csiNode *storagev1beta1.CSINode) map[v1.ResourceName]int64 { + // TODO: stop getting values from Node object in v1.18 + nodeVolumeLimits := nodeInfo.VolumeLimits() + if csiNode != nil { + for i := range csiNode.Spec.Drivers { + d := csiNode.Spec.Drivers[i] + if d.Allocatable != nil && d.Allocatable.Count != nil { + // TODO: drop GetCSIAttachLimitKey once we don't get values from Node object (v1.18) + k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name)) + nodeVolumeLimits[k] = int64(*d.Allocatable.Count) + } + } + } + return nodeVolumeLimits +} + func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { // If the new pod doesn't have any volume attached to it, the predicate will always be true @@ -62,8 +80,20 @@ func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( return true, nil, nil } + node := nodeInfo.Node() + if node == nil { + return false, nil, fmt.Errorf("node not found") + } + + // If CSINode doesn't exist, the predicate may read the limits from Node object + csiNode, err := c.csiNodeInfo.GetCSINodeInfo(node.Name) + if err != nil { + // TODO: return the error once CSINode is created by default (2 releases) + klog.V(5).Infof("Could not get a CSINode object for the node: %v", err) + } + newVolumes := make(map[string]string) - if err := c.filterAttachableVolumes(nodeInfo, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + if err := c.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { return false, nil, err } @@ -73,14 +103,14 @@ func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( } // If the node doesn't have volume limits, the predicate will always be true - nodeVolumeLimits := nodeInfo.VolumeLimits() + nodeVolumeLimits := c.getVolumeLimits(nodeInfo, csiNode) if len(nodeVolumeLimits) == 0 { return true, nil, nil } attachedVolumes := make(map[string]string) for _, existingPod := range nodeInfo.Pods() { - if err := c.filterAttachableVolumes(nodeInfo, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil { + if err := c.filterAttachableVolumes(csiNode, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil { return false, nil, err } } @@ -113,8 +143,7 @@ func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( } func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes( - nodeInfo *schedulernodeinfo.NodeInfo, volumes []v1.Volume, namespace string, result map[string]string) error { - + csiNode *storagev1beta1.CSINode, volumes []v1.Volume, namespace string, result map[string]string) error { for _, vol := range volumes { // CSI volumes can only be used as persistent volumes if vol.PersistentVolumeClaim == nil { @@ -133,7 +162,6 @@ func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes( continue } - csiNode := nodeInfo.CSINode() driverName, volumeHandle := c.getCSIDriverInfo(csiNode, pvc) if driverName == "" || volumeHandle == "" { klog.V(5).Infof("Could not find a CSI driver name or volume handle, not counting volume") diff --git a/pkg/scheduler/algorithm/predicates/csi_volume_predicate_test.go b/pkg/scheduler/algorithm/predicates/csi_volume_predicate_test.go index b0c4e7f297e..12167108772 100644 --- a/pkg/scheduler/algorithm/predicates/csi_volume_predicate_test.go +++ b/pkg/scheduler/algorithm/predicates/csi_volume_predicate_test.go @@ -23,13 +23,13 @@ import ( "testing" v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" csilibplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/features" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) const ( @@ -437,11 +437,11 @@ func TestCSIVolumeCountPredicate(t *testing.T) { // running attachable predicate tests with feature gate and limit present on nodes for _, test := range tests { t.Run(test.test, func(t *testing.T) { - node := getNodeWithPodAndVolumeLimits(test.limitSource, test.existingPods, int64(test.maxVols), test.driverNames...) + node, csiNode := getNodeWithPodAndVolumeLimits(test.limitSource, test.existingPods, int64(test.maxVols), test.driverNames...) if test.migrationEnabled { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)() - enableMigrationOnNode(node, csilibplugins.AWSEBSInTreePluginName) + enableMigrationOnNode(csiNode, csilibplugins.AWSEBSInTreePluginName) } else { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)() @@ -452,7 +452,8 @@ func TestCSIVolumeCountPredicate(t *testing.T) { expectedFailureReasons = []PredicateFailureReason{test.expectedFailureReason} } - pred := NewCSIMaxVolumeLimitPredicate(getFakeCSIPVInfo(test.filterName, test.driverNames...), + pred := NewCSIMaxVolumeLimitPredicate(getFakeCSINodeInfo(csiNode), + getFakeCSIPVInfo(test.filterName, test.driverNames...), getFakeCSIPVCInfo(test.filterName, "csi-sc", test.driverNames...), getFakeCSIStorageClassInfo("csi-sc", test.driverNames[0])) @@ -544,8 +545,7 @@ func getFakeCSIPVCInfo(volumeName, scName string, driverNames ...string) FakePer return pvcInfos } -func enableMigrationOnNode(nodeInfo *schedulernodeinfo.NodeInfo, pluginName string) { - csiNode := nodeInfo.CSINode() +func enableMigrationOnNode(csiNode *storagev1beta1.CSINode, pluginName string) { nodeInfoAnnotations := csiNode.GetAnnotations() if nodeInfoAnnotations == nil { nodeInfoAnnotations = map[string]string{} @@ -557,7 +557,6 @@ func enableMigrationOnNode(nodeInfo *schedulernodeinfo.NodeInfo, pluginName stri nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas csiNode.Annotations = nodeInfoAnnotations - nodeInfo.SetCSINode(csiNode) } func getFakeCSIStorageClassInfo(scName, provisionerName string) FakeStorageClassInfo { @@ -568,3 +567,10 @@ func getFakeCSIStorageClassInfo(scName, provisionerName string) FakeStorageClass }, } } + +func getFakeCSINodeInfo(csiNode *storagev1beta1.CSINode) FakeCSINodeInfo { + if csiNode != nil { + return FakeCSINodeInfo(*csiNode) + } + return FakeCSINodeInfo{} +} diff --git a/pkg/scheduler/algorithm/predicates/max_attachable_volume_predicate_test.go b/pkg/scheduler/algorithm/predicates/max_attachable_volume_predicate_test.go index eb6b820bcea..8b078063823 100644 --- a/pkg/scheduler/algorithm/predicates/max_attachable_volume_predicate_test.go +++ b/pkg/scheduler/algorithm/predicates/max_attachable_volume_predicate_test.go @@ -835,12 +835,14 @@ func TestVolumeCountConflicts(t *testing.T) { // running attachable predicate tests without feature gate and no limit present on nodes for _, test := range tests { os.Setenv(KubeMaxPDVols, strconv.Itoa(test.maxVols)) + node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) pred := NewMaxPDVolumeCountPredicate(test.filterName, + getFakeCSINodeInfo(csiNode), getFakeStorageClassInfo(test.filterName), getFakePVInfo(test.filterName), getFakePVCInfo(test.filterName)) - fits, reasons, err := pred(test.newPod, GetPredicateMetadata(test.newPod, nil), schedulernodeinfo.NewNodeInfo(test.existingPods...)) + fits, reasons, err := pred(test.newPod, GetPredicateMetadata(test.newPod, nil), node) if err != nil { t.Errorf("[%s]%s: unexpected error: %v", test.filterName, test.test, err) } @@ -856,8 +858,9 @@ func TestVolumeCountConflicts(t *testing.T) { // running attachable predicate tests with feature gate and limit present on nodes for _, test := range tests { - node := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) + node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) pred := NewMaxPDVolumeCountPredicate(test.filterName, + getFakeCSINodeInfo(csiNode), getFakeStorageClassInfo(test.filterName), getFakePVInfo(test.filterName), getFakePVCInfo(test.filterName)) @@ -1048,23 +1051,24 @@ func TestMaxVolumeFuncM4(t *testing.T) { } } -func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int64, driverNames ...string) *schedulernodeinfo.NodeInfo { +func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int64, driverNames ...string) (*schedulernodeinfo.NodeInfo, *v1beta1.CSINode) { nodeInfo := schedulernodeinfo.NewNodeInfo(pods...) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-for-max-pd-test-1"}, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{}, + }, + } + var csiNode *v1beta1.CSINode + addLimitToNode := func() { - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "node-for-max-pd-test-1"}, - Status: v1.NodeStatus{ - Allocatable: v1.ResourceList{}, - }, - } for _, driver := range driverNames { node.Status.Allocatable[getVolumeLimitKey(driver)] = *resource.NewQuantity(limit, resource.DecimalSI) } - nodeInfo.SetNode(node) } - createCSINode := func() *v1beta1.CSINode { - return &v1beta1.CSINode{ + initCSINode := func() { + csiNode = &v1beta1.CSINode{ ObjectMeta: metav1.ObjectMeta{Name: "csi-node-for-max-pd-test-1"}, Spec: v1beta1.CSINodeSpec{ Drivers: []v1beta1.CSINodeDriver{}, @@ -1072,8 +1076,8 @@ func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int } } - addLimitToCSINode := func(addLimits bool) { - csiNode := createCSINode() + addDriversCSINode := func(addLimits bool) { + initCSINode() for _, driver := range driverNames { driver := v1beta1.CSINodeDriver{ Name: driver, @@ -1086,26 +1090,26 @@ func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int } csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, driver) } - - nodeInfo.SetCSINode(csiNode) } + switch limitSource { case "node": addLimitToNode() case "csinode": - addLimitToCSINode(true) + addDriversCSINode(true) case "both": addLimitToNode() - addLimitToCSINode(true) + addDriversCSINode(true) case "csinode-with-no-limit": - addLimitToCSINode(false) + addDriversCSINode(false) case "no-csi-driver": - csiNode := createCSINode() - nodeInfo.SetCSINode(csiNode) + initCSINode() default: - return nodeInfo + // Do nothing. } - return nodeInfo + + nodeInfo.SetNode(node) + return nodeInfo, csiNode } func getVolumeLimitKey(filterType string) v1.ResourceName { diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index fa67c98f50e..21001907080 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -162,6 +162,11 @@ type NodeInfo interface { GetNodeInfo(nodeID string) (*v1.Node, error) } +// CSINodeInfo interface represents anything that can get CSINode object from node ID. +type CSINodeInfo interface { + GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) +} + // PersistentVolumeInfo interface represents anything that can get persistent volume object by PV ID. type PersistentVolumeInfo interface { GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) @@ -285,6 +290,7 @@ type MaxPDVolumeCountChecker struct { filter VolumeFilter volumeLimitKey v1.ResourceName maxVolumeFunc func(node *v1.Node) int + csiNodeInfo CSINodeInfo pvInfo PersistentVolumeInfo pvcInfo PersistentVolumeClaimInfo scInfo StorageClassInfo @@ -316,8 +322,8 @@ type VolumeFilter struct { // The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume // types, counts the number of unique volumes, and rejects the new pod if it would place the total count over // the maximum. -func NewMaxPDVolumeCountPredicate( - filterName string, scInfo StorageClassInfo, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) FitPredicate { +func NewMaxPDVolumeCountPredicate(filterName string, csiNodeInfo CSINodeInfo, scInfo StorageClassInfo, + pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) FitPredicate { var filter VolumeFilter var volumeLimitKey v1.ResourceName @@ -345,6 +351,7 @@ func NewMaxPDVolumeCountPredicate( filter: filter, volumeLimitKey: volumeLimitKey, maxVolumeFunc: getMaxVolumeFunc(filterName), + csiNodeInfo: csiNodeInfo, pvInfo: pvInfo, pvcInfo: pvcInfo, scInfo: scInfo, @@ -492,8 +499,20 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta PredicateMetadata, return true, nil, nil } - // If a plugin has been migrated to a CSI driver, defer to the CSI predicate. - if c.filter.IsMigrated(nodeInfo.CSINode()) { + node := nodeInfo.Node() + if node == nil { + return false, nil, fmt.Errorf("node not found") + } + + csiNode, err := c.csiNodeInfo.GetCSINodeInfo(node.Name) + if err != nil { + // we don't fail here because the CSINode object is only necessary + // for determining whether the migration is enabled or not + klog.V(5).Infof("Could not get a CSINode object for the node: %v", err) + } + + // if a plugin has been migrated to a CSI driver, defer to the CSI predicate + if c.filter.IsMigrated(csiNode) { return true, nil, nil } @@ -514,7 +533,7 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta PredicateMetadata, } numNewVolumes := len(newVolumes) - maxAttachLimit := c.maxVolumeFunc(nodeInfo.Node()) + maxAttachLimit := c.maxVolumeFunc(node) if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { volumeLimits := nodeInfo.VolumeLimits() diff --git a/pkg/scheduler/algorithm/predicates/testing_helper.go b/pkg/scheduler/algorithm/predicates/testing_helper.go index c8e7cb1e17c..e40fa5a355c 100644 --- a/pkg/scheduler/algorithm/predicates/testing_helper.go +++ b/pkg/scheduler/algorithm/predicates/testing_helper.go @@ -19,8 +19,9 @@ package predicates import ( "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" ) // FakePersistentVolumeClaimInfo declares a []v1.PersistentVolumeClaim type for testing. @@ -58,6 +59,15 @@ func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*v1.Node, error) { return nil, fmt.Errorf("Unable to find node: %s", nodeName) } +// FakeCSINodeInfo declares a storagev1beta1.CSINode type for testing. +type FakeCSINodeInfo storagev1beta1.CSINode + +// GetCSINodeInfo returns a fake CSINode object. +func (n FakeCSINodeInfo) GetCSINodeInfo(name string) (*storagev1beta1.CSINode, error) { + csiNode := storagev1beta1.CSINode(n) + return &csiNode, nil +} + // FakePersistentVolumeInfo declares a []v1.PersistentVolume type for testing. type FakePersistentVolumeInfo []v1.PersistentVolume diff --git a/pkg/scheduler/algorithmprovider/defaults/register_predicates.go b/pkg/scheduler/algorithmprovider/defaults/register_predicates.go index b1283261b8d..eb6bd09618a 100644 --- a/pkg/scheduler/algorithmprovider/defaults/register_predicates.go +++ b/pkg/scheduler/algorithmprovider/defaults/register_predicates.go @@ -61,33 +61,33 @@ func init() { factory.RegisterFitPredicateFactory( predicates.MaxEBSVolumeCountPred, func(args factory.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.StorageClassInfo, args.PVInfo, args.PVCInfo) + return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo) }, ) // Fit is determined by whether or not there would be too many GCE PD volumes attached to the node factory.RegisterFitPredicateFactory( predicates.MaxGCEPDVolumeCountPred, func(args factory.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.StorageClassInfo, args.PVInfo, args.PVCInfo) + return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo) }, ) // Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node factory.RegisterFitPredicateFactory( predicates.MaxAzureDiskVolumeCountPred, func(args factory.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.StorageClassInfo, args.PVInfo, args.PVCInfo) + return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo) }, ) factory.RegisterFitPredicateFactory( predicates.MaxCSIVolumeCountPred, func(args factory.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewCSIMaxVolumeLimitPredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo) + return predicates.NewCSIMaxVolumeLimitPredicate(args.CSINodeInfo, args.PVInfo, args.PVCInfo, args.StorageClassInfo) }, ) factory.RegisterFitPredicateFactory( predicates.MaxCinderVolumeCountPred, func(args factory.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, args.StorageClassInfo, args.PVInfo, args.PVCInfo) + return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo) }, )