From 4b17a48def1c995f6033643d4045f7ecb53b09ba Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 13 Aug 2018 19:38:30 -0400 Subject: [PATCH 1/3] Implement support for updating volume limits Create a new predicate to count CSI volumes --- .../predicates/csi_volume_predicate.go | 154 +++++++++++ .../algorithm/predicates/predicates.go | 4 +- .../algorithmprovider/defaults/defaults.go | 6 + .../defaults/defaults_test.go | 1 + pkg/scheduler/core/equivalence/eqivalence.go | 18 +- pkg/scheduler/factory/factory.go | 12 + pkg/volume/csi/BUILD | 4 +- pkg/volume/csi/csi_plugin.go | 11 +- pkg/volume/csi/labelmanager/labelmanager.go | 250 ------------------ .../csi/{labelmanager => nodeupdater}/BUILD | 7 +- pkg/volume/csi/nodeupdater/nodeupdater.go | 193 ++++++++++++++ pkg/volume/util/BUILD | 2 + pkg/volume/util/attach_limit.go | 26 ++ pkg/volume/util/attach_limit_test.go | 40 +++ test/integration/scheduler/scheduler_test.go | 1 + 15 files changed, 464 insertions(+), 265 deletions(-) create mode 100644 pkg/scheduler/algorithm/predicates/csi_volume_predicate.go delete mode 100644 pkg/volume/csi/labelmanager/labelmanager.go rename pkg/volume/csi/{labelmanager => nodeupdater}/BUILD (75%) create mode 100644 pkg/volume/csi/nodeupdater/nodeupdater.go create mode 100644 pkg/volume/util/attach_limit_test.go diff --git a/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go new file mode 100644 index 00000000000..db6fdd39170 --- /dev/null +++ b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go @@ -0,0 +1,154 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "fmt" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler/algorithm" + schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +// CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes +type CSIMaxVolumeLimitChecker struct { + pvInfo PersistentVolumeInfo + pvcInfo PersistentVolumeClaimInfo +} + +// NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes +func NewCSIMaxVolumeLimitPredicate( + pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate { + c := &CSIMaxVolumeLimitChecker{ + pvInfo: pvInfo, + pvcInfo: pvcInfo, + } + return c.attachableLimitPredicate +} + +func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( + pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + + // if feature gate is disable we return + if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + return true, nil, nil + } + // If a pod doesn't have any volume attached to it, the predicate will always be true. + // Thus we make a fast path for it, to avoid unnecessary computations in this case. + if len(pod.Spec.Volumes) == 0 { + return true, nil, nil + } + + nodeVolumeLimits := nodeInfo.VolumeLimits() + + // if node does not have volume limits this predicate should exit + if len(nodeVolumeLimits) == 0 { + return true, nil, nil + } + + // a map of unique volume name/csi volume handle and volume limit key + newVolumes := make(map[string]string) + if err := c.filterAttachableVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + return false, nil, err + } + + if len(newVolumes) == 0 { + return true, nil, nil + } + + // a map of unique volume name/csi volume handle and volume limit key + attachedVolumes := make(map[string]string) + for _, existingPod := range nodeInfo.Pods() { + if err := c.filterAttachableVolumes(existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil { + return false, nil, err + } + } + + newVolumeCount := map[string]int{} + attachedVolumeCount := map[string]int{} + + for volumeName, volumeLimitKey := range attachedVolumes { + if _, ok := newVolumes[volumeName]; ok { + delete(newVolumes, volumeName) + } + attachedVolumeCount[volumeLimitKey]++ + } + + for _, volumeLimitKey := range newVolumes { + newVolumeCount[volumeLimitKey]++ + } + + for volumeLimitKey, count := range newVolumeCount { + maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)] + if ok { + currentVolumeCount := attachedVolumeCount[volumeLimitKey] + if currentVolumeCount+count > int(maxVolumeLimit) { + return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil + } + } + } + + return true, nil, nil +} + +func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes( + volumes []v1.Volume, namespace string, result map[string]string) error { + + for _, vol := range volumes { + // CSI volumes can only be used as persistent volumes + if vol.PersistentVolumeClaim != nil { + pvcName := vol.PersistentVolumeClaim.ClaimName + + if pvcName == "" { + return fmt.Errorf("PersistentVolumeClaim had no name") + } + + pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName) + + if err != nil { + glog.Errorf("Unable to look up PVC info for %s/%s", namespace, pvcName) + continue + } + + pvName := pvc.Spec.VolumeName + if pvName == "" { + glog.Errorf("Persistent volume had no name for claim %s/%s", namespace, pvcName) + continue + } + pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) + + if err != nil { + glog.Errorf("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName) + continue + } + + csiSource := pv.Spec.PersistentVolumeSource.CSI + if csiSource == nil { + glog.V(4).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName) + continue + } + driverName := csiSource.Driver + volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName) + result[csiSource.VolumeHandle] = volumeLimitKey + } + } + return nil +} diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index c919282c837..953fcef25e4 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -85,6 +85,8 @@ const ( MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount" // MaxAzureDiskVolumeCountPred defines the name of predicate MaxAzureDiskVolumeCount. MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount" + // MaxCSIVolumeCountPred defines the predicate that decides how many CSI volumes should be attached + MaxCSIVolumeCountPred = "MaxCSIVolumeCountPred" // NoVolumeZoneConflictPred defines the name of predicate NoVolumeZoneConflict. NoVolumeZoneConflictPred = "NoVolumeZoneConflict" // CheckNodeMemoryPressurePred defines the name of predicate CheckNodeMemoryPressure. @@ -137,7 +139,7 @@ var ( GeneralPred, HostNamePred, PodFitsHostPortsPred, MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred, PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred, - CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, + CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred, MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred, CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred} ) diff --git a/pkg/scheduler/algorithmprovider/defaults/defaults.go b/pkg/scheduler/algorithmprovider/defaults/defaults.go index 94d73b7e7fa..625d8b6742b 100644 --- a/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -137,6 +137,12 @@ func defaultPredicates() sets.String { return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo) }, ), + factory.RegisterFitPredicateFactory( + predicates.MaxCSIVolumeCountPred, + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + return predicates.NewCSIMaxVolumeLimitPredicate(args.PVInfo, args.PVCInfo) + }, + ), // Fit is determined by inter-pod affinity. factory.RegisterFitPredicateFactory( predicates.MatchInterPodAffinityPred, diff --git a/pkg/scheduler/algorithmprovider/defaults/defaults_test.go b/pkg/scheduler/algorithmprovider/defaults/defaults_test.go index 8dbc2f1536f..eab84151a51 100644 --- a/pkg/scheduler/algorithmprovider/defaults/defaults_test.go +++ b/pkg/scheduler/algorithmprovider/defaults/defaults_test.go @@ -71,6 +71,7 @@ func TestDefaultPredicates(t *testing.T) { predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, + predicates.MaxCSIVolumeCountPred, predicates.MatchInterPodAffinityPred, predicates.NoDiskConflictPred, predicates.GeneralPred, diff --git a/pkg/scheduler/core/equivalence/eqivalence.go b/pkg/scheduler/core/equivalence/eqivalence.go index 115acc4d181..8101809ad21 100644 --- a/pkg/scheduler/core/equivalence/eqivalence.go +++ b/pkg/scheduler/core/equivalence/eqivalence.go @@ -23,16 +23,16 @@ import ( "hash/fnv" "sync" - "k8s.io/kubernetes/pkg/scheduler/metrics" - + "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + "k8s.io/kubernetes/pkg/scheduler/metrics" hashutil "k8s.io/kubernetes/pkg/util/hash" - - "github.com/golang/glog" ) // Cache is a thread safe map saves and reuses the output of predicate functions, @@ -136,8 +136,16 @@ func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName str // MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc. for _, vol := range pod.Spec.Volumes { if vol.PersistentVolumeClaim != nil { - invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred) + invalidPredicates.Insert( + predicates.MaxEBSVolumeCountPred, + predicates.MaxGCEPDVolumeCountPred, + predicates.MaxAzureDiskVolumeCountPred) + if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) + } } else { + // We do not consider CSI volumes here because CSI + // volumes can not be used inline. if vol.AWSElasticBlockStore != nil { invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) } diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index a61ae267af0..bf00c74193d 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -488,6 +488,10 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) } + if pv.Spec.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) + } + // If PV contains zone related label, it may impact cached NoVolumeZoneConflict for k := range pv.Labels { if isZoneRegionLabel(k) { @@ -564,6 +568,10 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim // The bound volume type may change invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...) + if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) + } + // The bound volume's label may change invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) @@ -584,6 +592,10 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent } // The bound volume type may change invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) + + if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) + } } c.equivalencePodCache.InvalidatePredicates(invalidPredicates) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index e5f55a15d8a..1e9253d4f95 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -16,7 +16,7 @@ go_library( "//pkg/features:go_default_library", "//pkg/util/strings:go_default_library", "//pkg/volume:go_default_library", - "//pkg/volume/csi/labelmanager:go_default_library", + "//pkg/volume/csi/nodeupdater:go_default_library", "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", @@ -75,7 +75,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/volume/csi/fake:all-srcs", - "//pkg/volume/csi/labelmanager:all-srcs", + "//pkg/volume/csi/nodeupdater:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index cc9e3c97901..4328c6e3e7c 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -26,6 +26,7 @@ import ( "time" "context" + "github.com/golang/glog" api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +34,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" - "k8s.io/kubernetes/pkg/volume/csi/labelmanager" + "k8s.io/kubernetes/pkg/volume/csi/nodeupdater" ) const ( @@ -82,7 +83,7 @@ type csiDriversStore struct { // corresponding sockets var csiDrivers csiDriversStore -var lm labelmanager.Interface +var nodeUpdater nodeupdater.Interface // RegistrationCallback is called by kubelet's plugin watcher upon detection // of a new registration socket opened by CSI Driver registrar side car. @@ -106,13 +107,13 @@ func RegistrationCallback(pluginName string, endpoint string, versions []string, // TODO (verult) retry with exponential backoff, possibly added in csi client library. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() - driverNodeID, _, _, err := csi.NodeGetInfo(ctx) + driverNodeID, maxVolumePerNode, _, err := csi.NodeGetInfo(ctx) if err != nil { return nil, fmt.Errorf("error during CSI NodeGetInfo() call: %v", err) } // Calling nodeLabelManager to update annotations and labels for newly registered CSI driver - err = lm.AddLabels(pluginName, driverNodeID) + err = nodeUpdater.AddLabelsAndLimits(pluginName, driverNodeID, maxVolumePerNode) if err != nil { // Unregister the driver and return error csiDrivers.Lock() @@ -130,7 +131,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { // Initializing csiDrivers map and label management channels csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} - lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient()) + nodeUpdater = nodeupdater.NewNodeUpdater(host.GetNodeName(), host.GetKubeClient()) return nil } diff --git a/pkg/volume/csi/labelmanager/labelmanager.go b/pkg/volume/csi/labelmanager/labelmanager.go deleted file mode 100644 index 79fd5311453..00000000000 --- a/pkg/volume/csi/labelmanager/labelmanager.go +++ /dev/null @@ -1,250 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package labelmanager includes internal functions used to add/delete labels to -// kubernetes nodes for corresponding CSI drivers -package labelmanager // import "k8s.io/kubernetes/pkg/volume/csi/labelmanager" - -import ( - "encoding/json" - "fmt" - - "github.com/golang/glog" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/util/retry" -) - -const ( - // Name of node annotation that contains JSON map of driver names to node - // names - annotationKey = "csi.volume.kubernetes.io/nodeid" -) - -// labelManagementStruct is struct of channels used for communication between the driver registration -// code and the go routine responsible for managing the node's labels -type labelManagerStruct struct { - nodeName types.NodeName - k8s kubernetes.Interface -} - -// Interface implements an interface for managing labels of a node -type Interface interface { - AddLabels(driverName string, driverNodeId string) error -} - -// NewLabelManager initializes labelManagerStruct and returns available interfaces -func NewLabelManager(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface { - return labelManagerStruct{ - nodeName: nodeName, - k8s: kubeClient, - } -} - -// nodeLabelManager waits for labeling requests initiated by the driver's registration -// process. -func (lm labelManagerStruct) AddLabels(driverName string, driverNodeId string) error { - err := verifyAndAddNodeId(string(lm.nodeName), lm.k8s.CoreV1().Nodes(), driverName, driverNodeId) - if err != nil { - return fmt.Errorf("failed to update node %s's annotation with error: %+v", lm.nodeName, err) - } - return nil -} - -// Clones the given map and returns a new map with the given key and value added. -// Returns the given map, if annotationKey is empty. -func cloneAndAddAnnotation( - annotations map[string]string, - annotationKey, - annotationValue string) map[string]string { - if annotationKey == "" { - // Don't need to add an annotation. - return annotations - } - // Clone. - newAnnotations := map[string]string{} - for key, value := range annotations { - newAnnotations[key] = value - } - newAnnotations[annotationKey] = annotationValue - return newAnnotations -} - -func verifyAndAddNodeId( - k8sNodeName string, - k8sNodesClient corev1.NodeInterface, - csiDriverName string, - csiDriverNodeId string) error { - // Add or update annotation on Node object - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Retrieve the latest version of Node before attempting update, so that - // existing changes are not overwritten. RetryOnConflict uses - // exponential backoff to avoid exhausting the apiserver. - result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) - if getErr != nil { - glog.Errorf("Failed to get latest version of Node: %v", getErr) - return getErr // do not wrap error - } - - var previousAnnotationValue string - if result.ObjectMeta.Annotations != nil { - previousAnnotationValue = - result.ObjectMeta.Annotations[annotationKey] - glog.V(3).Infof( - "previousAnnotationValue=%q", previousAnnotationValue) - } - - existingDriverMap := map[string]string{} - if previousAnnotationValue != "" { - // Parse previousAnnotationValue as JSON - if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { - return fmt.Errorf( - "failed to parse node's %q annotation value (%q) err=%v", - annotationKey, - previousAnnotationValue, - err) - } - } - - if val, ok := existingDriverMap[csiDriverName]; ok { - if val == csiDriverNodeId { - // Value already exists in node annotation, nothing more to do - glog.V(2).Infof( - "The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v", - csiDriverName, - csiDriverNodeId, - annotationKey, - previousAnnotationValue) - return nil - } - } - - // Add/update annotation value - existingDriverMap[csiDriverName] = csiDriverNodeId - jsonObj, err := json.Marshal(existingDriverMap) - if err != nil { - return fmt.Errorf( - "failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v", - csiDriverName, - csiDriverNodeId, - annotationKey, - previousAnnotationValue) - } - - result.ObjectMeta.Annotations = cloneAndAddAnnotation( - result.ObjectMeta.Annotations, - annotationKey, - string(jsonObj)) - _, updateErr := k8sNodesClient.Update(result) - if updateErr == nil { - glog.V(2).Infof( - "Updated node %q successfully for CSI driver %q and CSI node name %q", - k8sNodeName, - csiDriverName, - csiDriverNodeId) - } - return updateErr // do not wrap error - }) - if retryErr != nil { - return fmt.Errorf("node update failed: %v", retryErr) - } - return nil -} - -// Fetches Kubernetes node API object corresponding to k8sNodeName. -// If the csiDriverName is present in the node annotation, it is removed. -func verifyAndDeleteNodeId( - k8sNodeName string, - k8sNodesClient corev1.NodeInterface, - csiDriverName string) error { - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Retrieve the latest version of Node before attempting update, so that - // existing changes are not overwritten. RetryOnConflict uses - // exponential backoff to avoid exhausting the apiserver. - result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) - if getErr != nil { - glog.Errorf("failed to get latest version of Node: %v", getErr) - return getErr // do not wrap error - } - - var previousAnnotationValue string - if result.ObjectMeta.Annotations != nil { - previousAnnotationValue = - result.ObjectMeta.Annotations[annotationKey] - glog.V(3).Infof( - "previousAnnotationValue=%q", previousAnnotationValue) - } - - existingDriverMap := map[string]string{} - if previousAnnotationValue == "" { - // Value already exists in node annotation, nothing more to do - glog.V(2).Infof( - "The key %q does not exist in node %q annotation, no need to cleanup.", - csiDriverName, - annotationKey) - return nil - } - - // Parse previousAnnotationValue as JSON - if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { - return fmt.Errorf( - "failed to parse node's %q annotation value (%q) err=%v", - annotationKey, - previousAnnotationValue, - err) - } - - if _, ok := existingDriverMap[csiDriverName]; !ok { - // Value already exists in node annotation, nothing more to do - glog.V(2).Infof( - "The key %q does not eixst in node %q annotation, no need to cleanup: %v", - csiDriverName, - annotationKey, - previousAnnotationValue) - return nil - } - - // Add/update annotation value - delete(existingDriverMap, csiDriverName) - jsonObj, err := json.Marshal(existingDriverMap) - if err != nil { - return fmt.Errorf( - "failed while trying to remove key %q from node %q annotation. Existing data: %v", - csiDriverName, - annotationKey, - previousAnnotationValue) - } - - result.ObjectMeta.Annotations = cloneAndAddAnnotation( - result.ObjectMeta.Annotations, - annotationKey, - string(jsonObj)) - _, updateErr := k8sNodesClient.Update(result) - if updateErr == nil { - fmt.Printf( - "Updated node %q annotation to remove CSI driver %q.", - k8sNodeName, - csiDriverName) - } - return updateErr // do not wrap error - }) - if retryErr != nil { - return fmt.Errorf("node update failed: %v", retryErr) - } - return nil -} diff --git a/pkg/volume/csi/labelmanager/BUILD b/pkg/volume/csi/nodeupdater/BUILD similarity index 75% rename from pkg/volume/csi/labelmanager/BUILD rename to pkg/volume/csi/nodeupdater/BUILD index 0f952539ea4..2b8e18a753b 100644 --- a/pkg/volume/csi/labelmanager/BUILD +++ b/pkg/volume/csi/nodeupdater/BUILD @@ -2,10 +2,13 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["labelmanager.go"], - importpath = "k8s.io/kubernetes/pkg/volume/csi/labelmanager", + srcs = ["nodeupdater.go"], + importpath = "k8s.io/kubernetes/pkg/volume/csi/nodeupdater", visibility = ["//visibility:public"], deps = [ + "//pkg/volume/util:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", diff --git a/pkg/volume/csi/nodeupdater/nodeupdater.go b/pkg/volume/csi/nodeupdater/nodeupdater.go new file mode 100644 index 00000000000..5571fe4c725 --- /dev/null +++ b/pkg/volume/csi/nodeupdater/nodeupdater.go @@ -0,0 +1,193 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package nodeupdater includes internal functions used to add/delete labels to +// kubernetes nodes for corresponding CSI drivers +package nodeupdater // import "k8s.io/kubernetes/pkg/volume/csi/nodeupdater" + +import ( + "encoding/json" + "fmt" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/retry" + "k8s.io/kubernetes/pkg/volume/util" +) + +const ( + // Name of node annotation that contains JSON map of driver names to node + // names + annotationKey = "csi.volume.kubernetes.io/nodeid" +) + +// labelManagementStruct is struct of channels used for communication between the driver registration +// code and the go routine responsible for managing the node's labels +type nodeUpdateStruct struct { + nodeName types.NodeName + k8s kubernetes.Interface +} + +// Interface implements an interface for managing labels of a node +type Interface interface { + AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error +} + +// NewNodeupdater initializes nodeUpdateStruct and returns available interfaces +func NewNodeUpdater(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface { + return nodeUpdateStruct{ + nodeName: nodeName, + k8s: kubeClient, + } +} + +// AddLabelsAndLimits nodeUpdater waits for labeling requests initiated by the driver's registration +// process and updates labels and attach limits +func (nodeUpdater nodeUpdateStruct) AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error { + err := addLabelsAndLimits(string(nodeUpdater.nodeName), nodeUpdater.k8s.CoreV1().Nodes(), driverName, driverNodeId, maxLimit) + if err != nil { + return err + } + return nil +} + +func addMaxAttachLimitToNode(node *v1.Node, driverName string, maxLimit int64) *v1.Node { + if maxLimit <= 0 { + glog.V(4).Infof("skipping adding attach limit for %s", driverName) + return node + } + + if node.Status.Capacity == nil { + node.Status.Capacity = v1.ResourceList{} + } + if node.Status.Allocatable == nil { + node.Status.Allocatable = v1.ResourceList{} + } + limitKeyName := util.GetCSIAttachLimitKey(driverName) + node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI) + node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI) + return node +} + +// Clones the given map and returns a new map with the given key and value added. +// Returns the given map, if annotationKey is empty. +func cloneAndAddAnnotation( + annotations map[string]string, + annotationKey, + annotationValue string) map[string]string { + if annotationKey == "" { + // Don't need to add an annotation. + return annotations + } + // Clone. + newAnnotations := map[string]string{} + for key, value := range annotations { + newAnnotations[key] = value + } + newAnnotations[annotationKey] = annotationValue + return newAnnotations +} + +func addNodeIdToNode(node *v1.Node, driverName string, csiDriverNodeId string) (*v1.Node, error) { + var previousAnnotationValue string + if node.ObjectMeta.Annotations != nil { + previousAnnotationValue = + node.ObjectMeta.Annotations[annotationKey] + glog.V(3).Infof( + "previousAnnotationValue=%q", previousAnnotationValue) + } + + existingDriverMap := map[string]string{} + if previousAnnotationValue != "" { + // Parse previousAnnotationValue as JSON + if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { + return node, fmt.Errorf( + "failed to parse node's %q annotation value (%q) err=%v", + annotationKey, + previousAnnotationValue, + err) + } + } + + if val, ok := existingDriverMap[driverName]; ok { + if val == csiDriverNodeId { + // Value already exists in node annotation, nothing more to do + glog.V(2).Infof( + "The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v", + driverName, + csiDriverNodeId, + annotationKey, + previousAnnotationValue) + return node, nil + } + } + + // Add/update annotation value + existingDriverMap[driverName] = csiDriverNodeId + jsonObj, err := json.Marshal(existingDriverMap) + if err != nil { + return node, fmt.Errorf( + "failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v", + driverName, + csiDriverNodeId, + annotationKey, + previousAnnotationValue) + } + + node.ObjectMeta.Annotations = cloneAndAddAnnotation( + node.ObjectMeta.Annotations, + annotationKey, + string(jsonObj)) + return node, nil +} + +func addLabelsAndLimits(nodeName string, nodeClient corev1.NodeInterface, driverName string, csiDriverNodeId string, maxLimit int64) error { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Retrieve the latest version of Node before attempting update, so that + // existing changes are not overwritten. RetryOnConflict uses + // exponential backoff to avoid exhausting the apiserver. + node, getErr := nodeClient.Get(nodeName, metav1.GetOptions{}) + if getErr != nil { + glog.Errorf("Failed to get latest version of Node: %v", getErr) + return getErr // do not wrap error + } + var labelErr error + node, labelErr = addNodeIdToNode(node, driverName, csiDriverNodeId) + if labelErr != nil { + return labelErr + } + node = addMaxAttachLimitToNode(node, driverName, maxLimit) + + _, updateErr := nodeClient.Update(node) + if updateErr == nil { + glog.V(2).Infof( + "Updated node %q successfully for CSI driver %q and CSI node name %q", + nodeName, + driverName, + csiDriverNodeId) + } + return updateErr // do not wrap error + }) + if retryErr != nil { + return fmt.Errorf("error setting attach limit and labels for %s with : %v", driverName, retryErr) + } + return nil +} diff --git a/pkg/volume/util/BUILD b/pkg/volume/util/BUILD index 049558ce989..a6b371ede0f 100644 --- a/pkg/volume/util/BUILD +++ b/pkg/volume/util/BUILD @@ -49,6 +49,7 @@ go_test( name = "go_default_test", srcs = [ "atomic_writer_test.go", + "attach_limit_test.go", "device_util_linux_test.go", "nested_volumes_test.go", "resize_util_test.go", @@ -57,6 +58,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/apis/core/install:go_default_library", + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/volume/util/attach_limit.go b/pkg/volume/util/attach_limit.go index 610f5f5b2cc..04bda52533e 100644 --- a/pkg/volume/util/attach_limit.go +++ b/pkg/volume/util/attach_limit.go @@ -16,6 +16,11 @@ limitations under the License. package util +import ( + "crypto/sha1" + "encoding/hex" +) + // This file is a common place holder for volume limit utility constants // shared between volume package and scheduler @@ -26,4 +31,25 @@ const ( AzureVolumeLimitKey = "attachable-volumes-azure-disk" // GCEVolumeLimitKey stores resource name that will store volume limits for GCE node GCEVolumeLimitKey = "attachable-volumes-gce-pd" + + // CSIAttachLimitPrefix defines prefix used for CSI volumes + CSIAttachLimitPrefix = "attachable-volumes-csi-" + + // ResourceNameLengthLimit stores maximum allowed Length for a ResourceName + ResourceNameLengthLimit = 63 ) + +// GetCSIAttachLimitKey returns limit key used for CSI volumes +func GetCSIAttachLimitKey(driverName string) string { + csiPrefixLength := len(CSIAttachLimitPrefix) + totalkeyLength := csiPrefixLength + len(driverName) + if totalkeyLength >= ResourceNameLengthLimit { + charsFromDriverName := driverName[:23] + hash := sha1.New() + hash.Write([]byte(driverName)) + hashed := hex.EncodeToString(hash.Sum(nil)) + hashed = hashed[:16] + return CSIAttachLimitPrefix + charsFromDriverName + hashed + } + return CSIAttachLimitPrefix + driverName +} diff --git a/pkg/volume/util/attach_limit_test.go b/pkg/volume/util/attach_limit_test.go new file mode 100644 index 00000000000..98e8e8d0331 --- /dev/null +++ b/pkg/volume/util/attach_limit_test.go @@ -0,0 +1,40 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "testing" + + "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" +) + +func TestGetCSIAttachLimitKey(t *testing.T) { + // When driverName is less than 39 characters + csiLimitKey := GetCSIAttachLimitKey("com.amazon.ebs") + if csiLimitKey != "attachable-volumes-csi-com.amazon.ebs" { + t.Errorf("Expected com.amazon.ebs got %s", csiLimitKey) + } + + // When driver is longer than 39 chars + csiLimitKeyLonger := GetCSIAttachLimitKey("com.amazon.kubernetes.eks.ec2.ebs/csi-driver") + fmt.Println(csiLimitKeyLonger) + if !v1helper.IsAttachableVolumeResourceName(v1.ResourceName(csiLimitKeyLonger)) { + t.Errorf("Expected %s to have attachable prefix", csiLimitKeyLonger) + } +} diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 14b065bdf64..19678cf7212 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -140,6 +140,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { "GeneralPredicates", "MatchInterPodAffinity", "MaxAzureDiskVolumeCount", + "MaxCSIVolumeCountPred", "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "NoDiskConflict", From 8e4b33d1a8adb25bd2e255585394cc38b80c7ad0 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 22 Aug 2018 17:54:52 -0400 Subject: [PATCH 2/3] Move volume limit feature to beta --- pkg/features/kube_features.go | 4 ++-- pkg/scheduler/algorithm/predicates/BUILD | 1 + pkg/scheduler/core/equivalence/BUILD | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 45cfe6fda49..773469105af 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -292,7 +292,7 @@ const ( VolumeSubpath utilfeature.Feature = "VolumeSubpath" // owner: @gnufied - // alpha : v1.11 + // beta : v1.12 // // Add support for volume plugins to report node specific // volume limits @@ -375,7 +375,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS QOSReserved: {Default: false, PreRelease: utilfeature.Alpha}, ExpandPersistentVolumes: {Default: true, PreRelease: utilfeature.Beta}, ExpandInUsePersistentVolumes: {Default: false, PreRelease: utilfeature.Alpha}, - AttachVolumeLimit: {Default: false, PreRelease: utilfeature.Alpha}, + AttachVolumeLimit: {Default: false, PreRelease: utilfeature.Beta}, CPUManager: {Default: true, PreRelease: utilfeature.Beta}, ServiceNodeExclusion: {Default: false, PreRelease: utilfeature.Alpha}, MountContainers: {Default: false, PreRelease: utilfeature.Alpha}, diff --git a/pkg/scheduler/algorithm/predicates/BUILD b/pkg/scheduler/algorithm/predicates/BUILD index edd3446ab1d..3de5d312d15 100644 --- a/pkg/scheduler/algorithm/predicates/BUILD +++ b/pkg/scheduler/algorithm/predicates/BUILD @@ -9,6 +9,7 @@ load( go_library( name = "go_default_library", srcs = [ + "csi_volume_predicate.go", "error.go", "metadata.go", "predicates.go", diff --git a/pkg/scheduler/core/equivalence/BUILD b/pkg/scheduler/core/equivalence/BUILD index 1b9f81db3c8..2c56de7ebca 100644 --- a/pkg/scheduler/core/equivalence/BUILD +++ b/pkg/scheduler/core/equivalence/BUILD @@ -6,6 +6,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler/core/equivalence", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/cache:go_default_library", @@ -13,6 +14,7 @@ go_library( "//pkg/util/hash:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) From fc61620db53a9a2ac91de032ea935f8392f64268 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 22 Aug 2018 19:31:24 -0400 Subject: [PATCH 3/3] Fix compatibility tests for scheduler --- hack/.golint_failures | 2 +- pkg/scheduler/algorithm/predicates/BUILD | 1 + .../predicates/csi_volume_predicate.go | 71 +++---- .../predicates/csi_volume_predicate_test.go | 179 ++++++++++++++++++ .../max_attachable_volume_predicate_test.go | 102 +++++----- .../defaults/compatibility_test.go | 124 ++++++++++++ pkg/volume/util/attach_limit_test.go | 21 +- 7 files changed, 411 insertions(+), 89 deletions(-) create mode 100644 pkg/scheduler/algorithm/predicates/csi_volume_predicate_test.go diff --git a/hack/.golint_failures b/hack/.golint_failures index 227530c25cc..f1be0c7a70b 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -406,7 +406,7 @@ pkg/volume/azure_file pkg/volume/cephfs pkg/volume/configmap pkg/volume/csi/fake -pkg/volume/csi/labelmanager +pkg/volume/csi/nodeupdater pkg/volume/empty_dir pkg/volume/fc pkg/volume/flexvolume diff --git a/pkg/scheduler/algorithm/predicates/BUILD b/pkg/scheduler/algorithm/predicates/BUILD index 3de5d312d15..e4de1bb63f9 100644 --- a/pkg/scheduler/algorithm/predicates/BUILD +++ b/pkg/scheduler/algorithm/predicates/BUILD @@ -47,6 +47,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "csi_volume_predicate_test.go", "max_attachable_volume_predicate_test.go", "metadata_test.go", "predicates_test.go", diff --git a/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go index db6fdd39170..8e155ea2f18 100644 --- a/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go +++ b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go @@ -114,41 +114,44 @@ func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes( for _, vol := range volumes { // CSI volumes can only be used as persistent volumes - if vol.PersistentVolumeClaim != nil { - pvcName := vol.PersistentVolumeClaim.ClaimName - - if pvcName == "" { - return fmt.Errorf("PersistentVolumeClaim had no name") - } - - pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName) - - if err != nil { - glog.Errorf("Unable to look up PVC info for %s/%s", namespace, pvcName) - continue - } - - pvName := pvc.Spec.VolumeName - if pvName == "" { - glog.Errorf("Persistent volume had no name for claim %s/%s", namespace, pvcName) - continue - } - pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) - - if err != nil { - glog.Errorf("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName) - continue - } - - csiSource := pv.Spec.PersistentVolumeSource.CSI - if csiSource == nil { - glog.V(4).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName) - continue - } - driverName := csiSource.Driver - volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName) - result[csiSource.VolumeHandle] = volumeLimitKey + if vol.PersistentVolumeClaim == nil { + continue } + pvcName := vol.PersistentVolumeClaim.ClaimName + + if pvcName == "" { + return fmt.Errorf("PersistentVolumeClaim had no name") + } + + pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName) + + if err != nil { + glog.V(4).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName) + continue + } + + pvName := pvc.Spec.VolumeName + // TODO - the actual handling of unbound PVCs will be fixed by late binding design. + if pvName == "" { + glog.V(4).Infof("Persistent volume had no name for claim %s/%s", namespace, pvcName) + continue + } + pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) + + if err != nil { + glog.V(4).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName) + continue + } + + csiSource := pv.Spec.PersistentVolumeSource.CSI + if csiSource == nil { + glog.V(4).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName) + continue + } + driverName := csiSource.Driver + volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName) + result[csiSource.VolumeHandle] = volumeLimitKey + } return nil } diff --git a/pkg/scheduler/algorithm/predicates/csi_volume_predicate_test.go b/pkg/scheduler/algorithm/predicates/csi_volume_predicate_test.go new file mode 100644 index 00000000000..5e34088f4d9 --- /dev/null +++ b/pkg/scheduler/algorithm/predicates/csi_volume_predicate_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler/algorithm" +) + +func TestCSIVolumeCountPredicate(t *testing.T) { + // for pods with CSI pvcs + oneVolPod := &v1.Pod{ + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "csi-ebs", + }, + }, + }, + }, + }, + } + twoVolPod := &v1.Pod{ + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "cs-ebs-1", + }, + }, + }, + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "csi-ebs-2", + }, + }, + }, + }, + }, + } + + runningPod := &v1.Pod{ + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "csi-ebs-3", + }, + }, + }, + }, + }, + } + + tests := []struct { + newPod *v1.Pod + existingPods []*v1.Pod + filterName string + maxVols int + fits bool + test string + }{ + { + newPod: oneVolPod, + existingPods: []*v1.Pod{runningPod, twoVolPod}, + filterName: "csi-ebs", + maxVols: 4, + fits: true, + test: "fits when node capacity >= new pods CSI volume", + }, + { + newPod: oneVolPod, + existingPods: []*v1.Pod{runningPod, twoVolPod}, + filterName: "csi-ebs", + maxVols: 2, + fits: false, + test: "doesn't when node capacity <= pods CSI volume", + }, + } + + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AttachVolumeLimit, true)() + expectedFailureReasons := []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded} + // running attachable predicate tests with feature gate and limit present on nodes + for _, test := range tests { + node := getNodeWithPodAndVolumeLimits(test.existingPods, int64(test.maxVols), test.filterName) + pred := NewCSIMaxVolumeLimitPredicate(getFakeCSIPVInfo("csi-ebs", "csi-ebs"), getFakeCSIPVCInfo("csi-ebs")) + fits, reasons, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), node) + if err != nil { + t.Errorf("Using allocatable [%s]%s: unexpected error: %v", test.filterName, test.test, err) + } + if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) { + t.Errorf("Using allocatable [%s]%s: unexpected failure reasons: %v, want: %v", test.filterName, test.test, reasons, expectedFailureReasons) + } + if fits != test.fits { + t.Errorf("Using allocatable [%s]%s: expected %v, got %v", test.filterName, test.test, test.fits, fits) + } + } +} + +func getFakeCSIPVInfo(volumeName, driverName string) FakePersistentVolumeInfo { + return FakePersistentVolumeInfo{ + { + ObjectMeta: metav1.ObjectMeta{Name: volumeName}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + Driver: driverName, + VolumeHandle: volumeName, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: volumeName + "-2"}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + Driver: driverName, + VolumeHandle: volumeName + "-2", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: volumeName + "-3"}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + Driver: driverName, + VolumeHandle: volumeName + "-3", + }, + }, + }, + }, + } +} + +func getFakeCSIPVCInfo(volumeName string) FakePersistentVolumeClaimInfo { + return FakePersistentVolumeClaimInfo{ + { + ObjectMeta: metav1.ObjectMeta{Name: volumeName}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: volumeName}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: volumeName + "-2"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: volumeName + "-2"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: volumeName + "-3"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: volumeName + "-3"}, + }, + } +} diff --git a/pkg/scheduler/algorithm/predicates/max_attachable_volume_predicate_test.go b/pkg/scheduler/algorithm/predicates/max_attachable_volume_predicate_test.go index c03051415bc..57a3fcca9b3 100644 --- a/pkg/scheduler/algorithm/predicates/max_attachable_volume_predicate_test.go +++ b/pkg/scheduler/algorithm/predicates/max_attachable_volume_predicate_test.go @@ -742,60 +742,12 @@ func TestVolumeCountConflicts(t *testing.T) { }, } - pvInfo := func(filterName string) FakePersistentVolumeInfo { - return FakePersistentVolumeInfo{ - { - ObjectMeta: metav1.ObjectMeta{Name: "some" + filterName + "Vol"}, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: strings.ToLower(filterName) + "Vol"}, - }, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "someNon" + filterName + "Vol"}, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{}, - }, - }, - } - } - - pvcInfo := func(filterName string) FakePersistentVolumeClaimInfo { - return FakePersistentVolumeClaimInfo{ - { - ObjectMeta: metav1.ObjectMeta{Name: "some" + filterName + "Vol"}, - Spec: v1.PersistentVolumeClaimSpec{VolumeName: "some" + filterName + "Vol"}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "someNon" + filterName + "Vol"}, - Spec: v1.PersistentVolumeClaimSpec{VolumeName: "someNon" + filterName + "Vol"}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "pvcWithDeletedPV"}, - Spec: v1.PersistentVolumeClaimSpec{VolumeName: "pvcWithDeletedPV"}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "anotherPVCWithDeletedPV"}, - Spec: v1.PersistentVolumeClaimSpec{VolumeName: "anotherPVCWithDeletedPV"}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "unboundPVC"}, - Spec: v1.PersistentVolumeClaimSpec{VolumeName: ""}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "anotherUnboundPVC"}, - Spec: v1.PersistentVolumeClaimSpec{VolumeName: ""}, - }, - } - } - expectedFailureReasons := []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded} // running attachable predicate tests without feature gate and no limit present on nodes for _, test := range tests { os.Setenv(KubeMaxPDVols, strconv.Itoa(test.maxVols)) - pred := NewMaxPDVolumeCountPredicate(test.filterName, pvInfo(test.filterName), pvcInfo(test.filterName)) + pred := NewMaxPDVolumeCountPredicate(test.filterName, getFakePVInfo(test.filterName), getFakePVCInfo(test.filterName)) fits, reasons, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), schedulercache.NewNodeInfo(test.existingPods...)) if err != nil { t.Errorf("[%s]%s: unexpected error: %v", test.filterName, test.test, err) @@ -813,7 +765,7 @@ func TestVolumeCountConflicts(t *testing.T) { // running attachable predicate tests with feature gate and limit present on nodes for _, test := range tests { node := getNodeWithPodAndVolumeLimits(test.existingPods, int64(test.maxVols), test.filterName) - pred := NewMaxPDVolumeCountPredicate(test.filterName, pvInfo(test.filterName), pvcInfo(test.filterName)) + pred := NewMaxPDVolumeCountPredicate(test.filterName, getFakePVInfo(test.filterName), getFakePVCInfo(test.filterName)) fits, reasons, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), node) if err != nil { t.Errorf("Using allocatable [%s]%s: unexpected error: %v", test.filterName, test.test, err) @@ -827,6 +779,54 @@ func TestVolumeCountConflicts(t *testing.T) { } } +func getFakePVInfo(filterName string) FakePersistentVolumeInfo { + return FakePersistentVolumeInfo{ + { + ObjectMeta: metav1.ObjectMeta{Name: "some" + filterName + "Vol"}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: strings.ToLower(filterName) + "Vol"}, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "someNon" + filterName + "Vol"}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{}, + }, + }, + } +} + +func getFakePVCInfo(filterName string) FakePersistentVolumeClaimInfo { + return FakePersistentVolumeClaimInfo{ + { + ObjectMeta: metav1.ObjectMeta{Name: "some" + filterName + "Vol"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: "some" + filterName + "Vol"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "someNon" + filterName + "Vol"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: "someNon" + filterName + "Vol"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "pvcWithDeletedPV"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: "pvcWithDeletedPV"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "anotherPVCWithDeletedPV"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: "anotherPVCWithDeletedPV"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "unboundPVC"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: ""}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "anotherUnboundPVC"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: ""}, + }, + } +} + func TestMaxVolumeFunc(t *testing.T) { node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -867,6 +867,6 @@ func getVolumeLimitKey(filterType string) v1.ResourceName { case AzureDiskVolumeFilterType: return v1.ResourceName(volumeutil.AzureVolumeLimitKey) default: - return "" + return v1.ResourceName(volumeutil.GetCSIAttachLimitKey(filterType)) } } diff --git a/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index a1c16704215..9e2262ee306 100644 --- a/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -814,6 +814,130 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }}, }, }, + // Do not change this JSON after the corresponding release has been tagged. + // A failure indicates backwards compatibility with the specified release was broken. + "1.12": { + JSON: `{ + "kind": "Policy", + "apiVersion": "v1", + "predicates": [ + {"name": "MatchNodeSelector"}, + {"name": "PodFitsResources"}, + {"name": "PodFitsHostPorts"}, + {"name": "HostName"}, + {"name": "NoDiskConflict"}, + {"name": "NoVolumeZoneConflict"}, + {"name": "PodToleratesNodeTaints"}, + {"name": "CheckNodeMemoryPressure"}, + {"name": "CheckNodeDiskPressure"}, + {"name": "CheckNodePIDPressure"}, + {"name": "CheckNodeCondition"}, + {"name": "MaxEBSVolumeCount"}, + {"name": "MaxGCEPDVolumeCount"}, + {"name": "MaxAzureDiskVolumeCount"}, + {"name": "MaxCSIVolumeCountPred"}, + {"name": "MatchInterPodAffinity"}, + {"name": "GeneralPredicates"}, + {"name": "CheckVolumeBinding"}, + {"name": "TestServiceAffinity", "argument": {"serviceAffinity" : {"labels" : ["region"]}}}, + {"name": "TestLabelsPresence", "argument": {"labelsPresence" : {"labels" : ["foo"], "presence":true}}} + ],"priorities": [ + {"name": "EqualPriority", "weight": 2}, + {"name": "ImageLocalityPriority", "weight": 2}, + {"name": "LeastRequestedPriority", "weight": 2}, + {"name": "BalancedResourceAllocation", "weight": 2}, + {"name": "SelectorSpreadPriority", "weight": 2}, + {"name": "NodePreferAvoidPodsPriority", "weight": 2}, + {"name": "NodeAffinityPriority", "weight": 2}, + {"name": "TaintTolerationPriority", "weight": 2}, + {"name": "InterPodAffinityPriority", "weight": 2}, + {"name": "MostRequestedPriority", "weight": 2}, + { + "name": "RequestedToCapacityRatioPriority", + "weight": 2, + "argument": { + "requestedToCapacityRatioArguments": { + "shape": [ + {"utilization": 0, "score": 0}, + {"utilization": 50, "score": 7} + ] + } + }} + ],"extenders": [{ + "urlPrefix": "/prefix", + "filterVerb": "filter", + "prioritizeVerb": "prioritize", + "weight": 1, + "bindVerb": "bind", + "enableHttps": true, + "tlsConfig": {"Insecure":true}, + "httpTimeout": 1, + "nodeCacheCapable": true, + "managedResources": [{"name":"example.com/foo","ignoredByScheduler":true}], + "ignorable":true + }] + }`, + ExpectedPolicy: schedulerapi.Policy{ + Predicates: []schedulerapi.PredicatePolicy{ + {Name: "MatchNodeSelector"}, + {Name: "PodFitsResources"}, + {Name: "PodFitsHostPorts"}, + {Name: "HostName"}, + {Name: "NoDiskConflict"}, + {Name: "NoVolumeZoneConflict"}, + {Name: "PodToleratesNodeTaints"}, + {Name: "CheckNodeMemoryPressure"}, + {Name: "CheckNodeDiskPressure"}, + {Name: "CheckNodePIDPressure"}, + {Name: "CheckNodeCondition"}, + {Name: "MaxEBSVolumeCount"}, + {Name: "MaxGCEPDVolumeCount"}, + {Name: "MaxAzureDiskVolumeCount"}, + {Name: "MaxCSIVolumeCountPred"}, + {Name: "MatchInterPodAffinity"}, + {Name: "GeneralPredicates"}, + {Name: "CheckVolumeBinding"}, + {Name: "TestServiceAffinity", Argument: &schedulerapi.PredicateArgument{ServiceAffinity: &schedulerapi.ServiceAffinity{Labels: []string{"region"}}}}, + {Name: "TestLabelsPresence", Argument: &schedulerapi.PredicateArgument{LabelsPresence: &schedulerapi.LabelsPresence{Labels: []string{"foo"}, Presence: true}}}, + }, + Priorities: []schedulerapi.PriorityPolicy{ + {Name: "EqualPriority", Weight: 2}, + {Name: "ImageLocalityPriority", Weight: 2}, + {Name: "LeastRequestedPriority", Weight: 2}, + {Name: "BalancedResourceAllocation", Weight: 2}, + {Name: "SelectorSpreadPriority", Weight: 2}, + {Name: "NodePreferAvoidPodsPriority", Weight: 2}, + {Name: "NodeAffinityPriority", Weight: 2}, + {Name: "TaintTolerationPriority", Weight: 2}, + {Name: "InterPodAffinityPriority", Weight: 2}, + {Name: "MostRequestedPriority", Weight: 2}, + { + Name: "RequestedToCapacityRatioPriority", + Weight: 2, + Argument: &schedulerapi.PriorityArgument{ + RequestedToCapacityRatioArguments: &schedulerapi.RequestedToCapacityRatioArguments{ + UtilizationShape: []schedulerapi.UtilizationShapePoint{ + {Utilization: 0, Score: 0}, + {Utilization: 50, Score: 7}, + }}, + }, + }, + }, + ExtenderConfigs: []schedulerapi.ExtenderConfig{{ + URLPrefix: "/prefix", + FilterVerb: "filter", + PrioritizeVerb: "prioritize", + Weight: 1, + BindVerb: "bind", // 1.11 restored case-sensitivity, but allowed either "BindVerb" or "bindVerb" + EnableHTTPS: true, + TLSConfig: &restclient.TLSClientConfig{Insecure: true}, + HTTPTimeout: 1, + NodeCacheCapable: true, + ManagedResources: []schedulerapi.ExtenderManagedResource{{Name: v1.ResourceName("example.com/foo"), IgnoredByScheduler: true}}, + Ignorable: true, + }}, + }, + }, } registeredPredicates := sets.NewString(factory.ListRegisteredFitPredicates()...) diff --git a/pkg/volume/util/attach_limit_test.go b/pkg/volume/util/attach_limit_test.go index 98e8e8d0331..e298f197bf4 100644 --- a/pkg/volume/util/attach_limit_test.go +++ b/pkg/volume/util/attach_limit_test.go @@ -17,7 +17,8 @@ limitations under the License. package util import ( - "fmt" + "crypto/sha1" + "encoding/hex" "testing" "k8s.io/api/core/v1" @@ -32,9 +33,23 @@ func TestGetCSIAttachLimitKey(t *testing.T) { } // When driver is longer than 39 chars - csiLimitKeyLonger := GetCSIAttachLimitKey("com.amazon.kubernetes.eks.ec2.ebs/csi-driver") - fmt.Println(csiLimitKeyLonger) + longDriverName := "com.amazon.kubernetes.eks.ec2.ebs/csi-driver" + csiLimitKeyLonger := GetCSIAttachLimitKey(longDriverName) if !v1helper.IsAttachableVolumeResourceName(v1.ResourceName(csiLimitKeyLonger)) { t.Errorf("Expected %s to have attachable prefix", csiLimitKeyLonger) } + + expectedCSIKey := getDriverHash(longDriverName) + if csiLimitKeyLonger != expectedCSIKey { + t.Errorf("Expected limit to be %s got %s", expectedCSIKey, csiLimitKeyLonger) + } +} + +func getDriverHash(driverName string) string { + charsFromDriverName := driverName[:23] + hash := sha1.New() + hash.Write([]byte(driverName)) + hashed := hex.EncodeToString(hash.Sum(nil)) + hashed = hashed[:16] + return CSIAttachLimitPrefix + charsFromDriverName + hashed }