Merge pull request #67731 from gnufied/fix-csi-attach-limit

Automatic merge from submit-queue (batch tested with PRs 68161, 68023, 67909, 67955, 67731). If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md.

Fix csi attach limit

Add support for volume limits for CSI.

xref: https://github.com/kubernetes/community/pull/2051

```release-note
Add support for volume attach limits for CSI volumes
```
This commit is contained in:
Kubernetes Submit Queue 2018-09-05 14:51:55 -07:00 committed by GitHub
commit ca43f007a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 842 additions and 319 deletions

View File

@ -408,7 +408,7 @@ pkg/volume/azure_file
pkg/volume/cephfs
pkg/volume/configmap
pkg/volume/csi/fake
pkg/volume/csi/labelmanager
pkg/volume/csi/nodeupdater
pkg/volume/empty_dir
pkg/volume/fc
pkg/volume/flexvolume

View File

@ -303,7 +303,7 @@ const (
VolumeSubpath utilfeature.Feature = "VolumeSubpath"
// owner: @gnufied
// alpha : v1.11
// beta : v1.12
//
// Add support for volume plugins to report node specific
// volume limits
@ -421,7 +421,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
QOSReserved: {Default: false, PreRelease: utilfeature.Alpha},
ExpandPersistentVolumes: {Default: true, PreRelease: utilfeature.Beta},
ExpandInUsePersistentVolumes: {Default: false, PreRelease: utilfeature.Alpha},
AttachVolumeLimit: {Default: false, PreRelease: utilfeature.Alpha},
AttachVolumeLimit: {Default: false, PreRelease: utilfeature.Beta},
CPUManager: {Default: true, PreRelease: utilfeature.Beta},
CPUCFSQuotaPeriod: {Default: false, PreRelease: utilfeature.Alpha},
ServiceNodeExclusion: {Default: false, PreRelease: utilfeature.Alpha},

View File

@ -9,6 +9,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"csi_volume_predicate.go",
"error.go",
"metadata.go",
"predicates.go",
@ -46,6 +47,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"csi_volume_predicate_test.go",
"max_attachable_volume_predicate_test.go",
"metadata_test.go",
"predicates_test.go",

View File

@ -0,0 +1,157 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package predicates
import (
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
// CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes
type CSIMaxVolumeLimitChecker struct {
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
}
// NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes
func NewCSIMaxVolumeLimitPredicate(
pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
c := &CSIMaxVolumeLimitChecker{
pvInfo: pvInfo,
pvcInfo: pvcInfo,
}
return c.attachableLimitPredicate
}
func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// if feature gate is disable we return
if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
return true, nil, nil
}
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return true, nil, nil
}
nodeVolumeLimits := nodeInfo.VolumeLimits()
// if node does not have volume limits this predicate should exit
if len(nodeVolumeLimits) == 0 {
return true, nil, nil
}
// a map of unique volume name/csi volume handle and volume limit key
newVolumes := make(map[string]string)
if err := c.filterAttachableVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
return false, nil, err
}
if len(newVolumes) == 0 {
return true, nil, nil
}
// a map of unique volume name/csi volume handle and volume limit key
attachedVolumes := make(map[string]string)
for _, existingPod := range nodeInfo.Pods() {
if err := c.filterAttachableVolumes(existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
return false, nil, err
}
}
newVolumeCount := map[string]int{}
attachedVolumeCount := map[string]int{}
for volumeName, volumeLimitKey := range attachedVolumes {
if _, ok := newVolumes[volumeName]; ok {
delete(newVolumes, volumeName)
}
attachedVolumeCount[volumeLimitKey]++
}
for _, volumeLimitKey := range newVolumes {
newVolumeCount[volumeLimitKey]++
}
for volumeLimitKey, count := range newVolumeCount {
maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
if ok {
currentVolumeCount := attachedVolumeCount[volumeLimitKey]
if currentVolumeCount+count > int(maxVolumeLimit) {
return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
}
}
}
return true, nil, nil
}
func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes(
volumes []v1.Volume, namespace string, result map[string]string) error {
for _, vol := range volumes {
// CSI volumes can only be used as persistent volumes
if vol.PersistentVolumeClaim == nil {
continue
}
pvcName := vol.PersistentVolumeClaim.ClaimName
if pvcName == "" {
return fmt.Errorf("PersistentVolumeClaim had no name")
}
pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
if err != nil {
glog.V(4).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName)
continue
}
pvName := pvc.Spec.VolumeName
// TODO - the actual handling of unbound PVCs will be fixed by late binding design.
if pvName == "" {
glog.V(4).Infof("Persistent volume had no name for claim %s/%s", namespace, pvcName)
continue
}
pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
if err != nil {
glog.V(4).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName)
continue
}
csiSource := pv.Spec.PersistentVolumeSource.CSI
if csiSource == nil {
glog.V(4).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName)
continue
}
driverName := csiSource.Driver
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
result[csiSource.VolumeHandle] = volumeLimitKey
}
return nil
}

View File

@ -0,0 +1,179 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package predicates
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
)
func TestCSIVolumeCountPredicate(t *testing.T) {
// for pods with CSI pvcs
oneVolPod := &v1.Pod{
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "csi-ebs",
},
},
},
},
},
}
twoVolPod := &v1.Pod{
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "cs-ebs-1",
},
},
},
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "csi-ebs-2",
},
},
},
},
},
}
runningPod := &v1.Pod{
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "csi-ebs-3",
},
},
},
},
},
}
tests := []struct {
newPod *v1.Pod
existingPods []*v1.Pod
filterName string
maxVols int
fits bool
test string
}{
{
newPod: oneVolPod,
existingPods: []*v1.Pod{runningPod, twoVolPod},
filterName: "csi-ebs",
maxVols: 4,
fits: true,
test: "fits when node capacity >= new pods CSI volume",
},
{
newPod: oneVolPod,
existingPods: []*v1.Pod{runningPod, twoVolPod},
filterName: "csi-ebs",
maxVols: 2,
fits: false,
test: "doesn't when node capacity <= pods CSI volume",
},
}
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AttachVolumeLimit, true)()
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}
// running attachable predicate tests with feature gate and limit present on nodes
for _, test := range tests {
node := getNodeWithPodAndVolumeLimits(test.existingPods, int64(test.maxVols), test.filterName)
pred := NewCSIMaxVolumeLimitPredicate(getFakeCSIPVInfo("csi-ebs", "csi-ebs"), getFakeCSIPVCInfo("csi-ebs"))
fits, reasons, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), node)
if err != nil {
t.Errorf("Using allocatable [%s]%s: unexpected error: %v", test.filterName, test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("Using allocatable [%s]%s: unexpected failure reasons: %v, want: %v", test.filterName, test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("Using allocatable [%s]%s: expected %v, got %v", test.filterName, test.test, test.fits, fits)
}
}
}
func getFakeCSIPVInfo(volumeName, driverName string) FakePersistentVolumeInfo {
return FakePersistentVolumeInfo{
{
ObjectMeta: metav1.ObjectMeta{Name: volumeName},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: driverName,
VolumeHandle: volumeName,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: volumeName + "-2"},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: driverName,
VolumeHandle: volumeName + "-2",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: volumeName + "-3"},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: driverName,
VolumeHandle: volumeName + "-3",
},
},
},
},
}
}
func getFakeCSIPVCInfo(volumeName string) FakePersistentVolumeClaimInfo {
return FakePersistentVolumeClaimInfo{
{
ObjectMeta: metav1.ObjectMeta{Name: volumeName},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: volumeName},
},
{
ObjectMeta: metav1.ObjectMeta{Name: volumeName + "-2"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: volumeName + "-2"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: volumeName + "-3"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: volumeName + "-3"},
},
}
}

View File

@ -742,60 +742,12 @@ func TestVolumeCountConflicts(t *testing.T) {
},
}
pvInfo := func(filterName string) FakePersistentVolumeInfo {
return FakePersistentVolumeInfo{
{
ObjectMeta: metav1.ObjectMeta{Name: "some" + filterName + "Vol"},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: strings.ToLower(filterName) + "Vol"},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "someNon" + filterName + "Vol"},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{},
},
},
}
}
pvcInfo := func(filterName string) FakePersistentVolumeClaimInfo {
return FakePersistentVolumeClaimInfo{
{
ObjectMeta: metav1.ObjectMeta{Name: "some" + filterName + "Vol"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "some" + filterName + "Vol"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "someNon" + filterName + "Vol"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "someNon" + filterName + "Vol"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pvcWithDeletedPV"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "pvcWithDeletedPV"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "anotherPVCWithDeletedPV"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "anotherPVCWithDeletedPV"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "unboundPVC"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: ""},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "anotherUnboundPVC"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: ""},
},
}
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}
// running attachable predicate tests without feature gate and no limit present on nodes
for _, test := range tests {
os.Setenv(KubeMaxPDVols, strconv.Itoa(test.maxVols))
pred := NewMaxPDVolumeCountPredicate(test.filterName, pvInfo(test.filterName), pvcInfo(test.filterName))
pred := NewMaxPDVolumeCountPredicate(test.filterName, getFakePVInfo(test.filterName), getFakePVCInfo(test.filterName))
fits, reasons, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), schedulercache.NewNodeInfo(test.existingPods...))
if err != nil {
t.Errorf("[%s]%s: unexpected error: %v", test.filterName, test.test, err)
@ -813,7 +765,7 @@ func TestVolumeCountConflicts(t *testing.T) {
// running attachable predicate tests with feature gate and limit present on nodes
for _, test := range tests {
node := getNodeWithPodAndVolumeLimits(test.existingPods, int64(test.maxVols), test.filterName)
pred := NewMaxPDVolumeCountPredicate(test.filterName, pvInfo(test.filterName), pvcInfo(test.filterName))
pred := NewMaxPDVolumeCountPredicate(test.filterName, getFakePVInfo(test.filterName), getFakePVCInfo(test.filterName))
fits, reasons, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), node)
if err != nil {
t.Errorf("Using allocatable [%s]%s: unexpected error: %v", test.filterName, test.test, err)
@ -827,6 +779,54 @@ func TestVolumeCountConflicts(t *testing.T) {
}
}
func getFakePVInfo(filterName string) FakePersistentVolumeInfo {
return FakePersistentVolumeInfo{
{
ObjectMeta: metav1.ObjectMeta{Name: "some" + filterName + "Vol"},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: strings.ToLower(filterName) + "Vol"},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "someNon" + filterName + "Vol"},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{},
},
},
}
}
func getFakePVCInfo(filterName string) FakePersistentVolumeClaimInfo {
return FakePersistentVolumeClaimInfo{
{
ObjectMeta: metav1.ObjectMeta{Name: "some" + filterName + "Vol"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "some" + filterName + "Vol"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "someNon" + filterName + "Vol"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "someNon" + filterName + "Vol"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pvcWithDeletedPV"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "pvcWithDeletedPV"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "anotherPVCWithDeletedPV"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "anotherPVCWithDeletedPV"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "unboundPVC"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: ""},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "anotherUnboundPVC"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: ""},
},
}
}
func TestMaxVolumeFunc(t *testing.T) {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -867,6 +867,6 @@ func getVolumeLimitKey(filterType string) v1.ResourceName {
case AzureDiskVolumeFilterType:
return v1.ResourceName(volumeutil.AzureVolumeLimitKey)
default:
return ""
return v1.ResourceName(volumeutil.GetCSIAttachLimitKey(filterType))
}
}

View File

@ -85,6 +85,8 @@ const (
MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount"
// MaxAzureDiskVolumeCountPred defines the name of predicate MaxAzureDiskVolumeCount.
MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount"
// MaxCSIVolumeCountPred defines the predicate that decides how many CSI volumes should be attached
MaxCSIVolumeCountPred = "MaxCSIVolumeCountPred"
// NoVolumeZoneConflictPred defines the name of predicate NoVolumeZoneConflict.
NoVolumeZoneConflictPred = "NoVolumeZoneConflict"
// CheckNodeMemoryPressurePred defines the name of predicate CheckNodeMemoryPressure.
@ -137,7 +139,7 @@ var (
GeneralPred, HostNamePred, PodFitsHostPortsPred,
MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)

View File

@ -814,6 +814,130 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}},
},
},
// Do not change this JSON after the corresponding release has been tagged.
// A failure indicates backwards compatibility with the specified release was broken.
"1.12": {
JSON: `{
"kind": "Policy",
"apiVersion": "v1",
"predicates": [
{"name": "MatchNodeSelector"},
{"name": "PodFitsResources"},
{"name": "PodFitsHostPorts"},
{"name": "HostName"},
{"name": "NoDiskConflict"},
{"name": "NoVolumeZoneConflict"},
{"name": "PodToleratesNodeTaints"},
{"name": "CheckNodeMemoryPressure"},
{"name": "CheckNodeDiskPressure"},
{"name": "CheckNodePIDPressure"},
{"name": "CheckNodeCondition"},
{"name": "MaxEBSVolumeCount"},
{"name": "MaxGCEPDVolumeCount"},
{"name": "MaxAzureDiskVolumeCount"},
{"name": "MaxCSIVolumeCountPred"},
{"name": "MatchInterPodAffinity"},
{"name": "GeneralPredicates"},
{"name": "CheckVolumeBinding"},
{"name": "TestServiceAffinity", "argument": {"serviceAffinity" : {"labels" : ["region"]}}},
{"name": "TestLabelsPresence", "argument": {"labelsPresence" : {"labels" : ["foo"], "presence":true}}}
],"priorities": [
{"name": "EqualPriority", "weight": 2},
{"name": "ImageLocalityPriority", "weight": 2},
{"name": "LeastRequestedPriority", "weight": 2},
{"name": "BalancedResourceAllocation", "weight": 2},
{"name": "SelectorSpreadPriority", "weight": 2},
{"name": "NodePreferAvoidPodsPriority", "weight": 2},
{"name": "NodeAffinityPriority", "weight": 2},
{"name": "TaintTolerationPriority", "weight": 2},
{"name": "InterPodAffinityPriority", "weight": 2},
{"name": "MostRequestedPriority", "weight": 2},
{
"name": "RequestedToCapacityRatioPriority",
"weight": 2,
"argument": {
"requestedToCapacityRatioArguments": {
"shape": [
{"utilization": 0, "score": 0},
{"utilization": 50, "score": 7}
]
}
}}
],"extenders": [{
"urlPrefix": "/prefix",
"filterVerb": "filter",
"prioritizeVerb": "prioritize",
"weight": 1,
"bindVerb": "bind",
"enableHttps": true,
"tlsConfig": {"Insecure":true},
"httpTimeout": 1,
"nodeCacheCapable": true,
"managedResources": [{"name":"example.com/foo","ignoredByScheduler":true}],
"ignorable":true
}]
}`,
ExpectedPolicy: schedulerapi.Policy{
Predicates: []schedulerapi.PredicatePolicy{
{Name: "MatchNodeSelector"},
{Name: "PodFitsResources"},
{Name: "PodFitsHostPorts"},
{Name: "HostName"},
{Name: "NoDiskConflict"},
{Name: "NoVolumeZoneConflict"},
{Name: "PodToleratesNodeTaints"},
{Name: "CheckNodeMemoryPressure"},
{Name: "CheckNodeDiskPressure"},
{Name: "CheckNodePIDPressure"},
{Name: "CheckNodeCondition"},
{Name: "MaxEBSVolumeCount"},
{Name: "MaxGCEPDVolumeCount"},
{Name: "MaxAzureDiskVolumeCount"},
{Name: "MaxCSIVolumeCountPred"},
{Name: "MatchInterPodAffinity"},
{Name: "GeneralPredicates"},
{Name: "CheckVolumeBinding"},
{Name: "TestServiceAffinity", Argument: &schedulerapi.PredicateArgument{ServiceAffinity: &schedulerapi.ServiceAffinity{Labels: []string{"region"}}}},
{Name: "TestLabelsPresence", Argument: &schedulerapi.PredicateArgument{LabelsPresence: &schedulerapi.LabelsPresence{Labels: []string{"foo"}, Presence: true}}},
},
Priorities: []schedulerapi.PriorityPolicy{
{Name: "EqualPriority", Weight: 2},
{Name: "ImageLocalityPriority", Weight: 2},
{Name: "LeastRequestedPriority", Weight: 2},
{Name: "BalancedResourceAllocation", Weight: 2},
{Name: "SelectorSpreadPriority", Weight: 2},
{Name: "NodePreferAvoidPodsPriority", Weight: 2},
{Name: "NodeAffinityPriority", Weight: 2},
{Name: "TaintTolerationPriority", Weight: 2},
{Name: "InterPodAffinityPriority", Weight: 2},
{Name: "MostRequestedPriority", Weight: 2},
{
Name: "RequestedToCapacityRatioPriority",
Weight: 2,
Argument: &schedulerapi.PriorityArgument{
RequestedToCapacityRatioArguments: &schedulerapi.RequestedToCapacityRatioArguments{
UtilizationShape: []schedulerapi.UtilizationShapePoint{
{Utilization: 0, Score: 0},
{Utilization: 50, Score: 7},
}},
},
},
},
ExtenderConfigs: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
PrioritizeVerb: "prioritize",
Weight: 1,
BindVerb: "bind", // 1.11 restored case-sensitivity, but allowed either "BindVerb" or "bindVerb"
EnableHTTPS: true,
TLSConfig: &restclient.TLSClientConfig{Insecure: true},
HTTPTimeout: 1,
NodeCacheCapable: true,
ManagedResources: []schedulerapi.ExtenderManagedResource{{Name: v1.ResourceName("example.com/foo"), IgnoredByScheduler: true}},
Ignorable: true,
}},
},
},
}
registeredPredicates := sets.NewString(factory.ListRegisteredFitPredicates()...)

View File

@ -133,6 +133,12 @@ func defaultPredicates() sets.String {
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
factory.RegisterFitPredicateFactory(
predicates.MaxCSIVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewCSIMaxVolumeLimitPredicate(args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
predicates.MatchInterPodAffinityPred,

View File

@ -72,6 +72,7 @@ func TestDefaultPredicates(t *testing.T) {
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,

View File

@ -6,6 +6,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/core/equivalence",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/cache:go_default_library",
@ -13,6 +14,7 @@ go_library(
"//pkg/util/hash:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)

View File

@ -23,16 +23,16 @@ import (
"hash/fnv"
"sync"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"github.com/golang/glog"
)
// Cache is a thread safe map saves and reuses the output of predicate functions,
@ -136,8 +136,16 @@ func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName str
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc.
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred)
invalidPredicates.Insert(
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
} else {
// We do not consider CSI volumes here because CSI
// volumes can not be used inline.
if vol.AWSElasticBlockStore != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
}

View File

@ -489,6 +489,10 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
}
if pv.Spec.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
// If PV contains zone related label, it may impact cached NoVolumeZoneConflict
for k := range pv.Labels {
if isZoneRegionLabel(k) {
@ -565,6 +569,10 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim
// The bound volume type may change
invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
// The bound volume's label may change
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
@ -585,6 +593,10 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent
}
// The bound volume type may change
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
}
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)

View File

@ -16,7 +16,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/labelmanager:go_default_library",
"//pkg/volume/csi/nodeupdater:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
@ -83,7 +83,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/volume/csi/fake:all-srcs",
"//pkg/volume/csi/labelmanager:all-srcs",
"//pkg/volume/csi/nodeupdater:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],

View File

@ -40,7 +40,7 @@ import (
csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/labelmanager"
"k8s.io/kubernetes/pkg/volume/csi/nodeupdater"
)
const (
@ -94,7 +94,7 @@ type csiDriversStore struct {
// corresponding sockets
var csiDrivers csiDriversStore
var lm labelmanager.Interface
var nodeUpdater nodeupdater.Interface
// RegistrationCallback is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by CSI Driver registrar side car.
@ -118,13 +118,13 @@ func RegistrationCallback(pluginName string, endpoint string, versions []string,
// TODO (verult) retry with exponential backoff, possibly added in csi client library.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
driverNodeID, _, _, err := csi.NodeGetInfo(ctx)
driverNodeID, maxVolumePerNode, _, err := csi.NodeGetInfo(ctx)
if err != nil {
return nil, fmt.Errorf("error during CSI NodeGetInfo() call: %v", err)
}
// Calling nodeLabelManager to update annotations and labels for newly registered CSI driver
err = lm.AddLabels(pluginName, driverNodeID)
err = nodeUpdater.AddLabelsAndLimits(pluginName, driverNodeID, maxVolumePerNode)
if err != nil {
// Unregister the driver and return error
csiDrivers.Lock()
@ -142,7 +142,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
// Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient())
nodeUpdater = nodeupdater.NewNodeUpdater(host.GetNodeName(), host.GetKubeClient())
csiClient := host.GetCSIClient()
if csiClient != nil {

View File

@ -1,250 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package labelmanager includes internal functions used to add/delete labels to
// kubernetes nodes for corresponding CSI drivers
package labelmanager // import "k8s.io/kubernetes/pkg/volume/csi/labelmanager"
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
)
const (
// Name of node annotation that contains JSON map of driver names to node
// names
annotationKey = "csi.volume.kubernetes.io/nodeid"
)
// labelManagementStruct is struct of channels used for communication between the driver registration
// code and the go routine responsible for managing the node's labels
type labelManagerStruct struct {
nodeName types.NodeName
k8s kubernetes.Interface
}
// Interface implements an interface for managing labels of a node
type Interface interface {
AddLabels(driverName string, driverNodeId string) error
}
// NewLabelManager initializes labelManagerStruct and returns available interfaces
func NewLabelManager(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface {
return labelManagerStruct{
nodeName: nodeName,
k8s: kubeClient,
}
}
// nodeLabelManager waits for labeling requests initiated by the driver's registration
// process.
func (lm labelManagerStruct) AddLabels(driverName string, driverNodeId string) error {
err := verifyAndAddNodeId(string(lm.nodeName), lm.k8s.CoreV1().Nodes(), driverName, driverNodeId)
if err != nil {
return fmt.Errorf("failed to update node %s's annotation with error: %+v", lm.nodeName, err)
}
return nil
}
// Clones the given map and returns a new map with the given key and value added.
// Returns the given map, if annotationKey is empty.
func cloneAndAddAnnotation(
annotations map[string]string,
annotationKey,
annotationValue string) map[string]string {
if annotationKey == "" {
// Don't need to add an annotation.
return annotations
}
// Clone.
newAnnotations := map[string]string{}
for key, value := range annotations {
newAnnotations[key] = value
}
newAnnotations[annotationKey] = annotationValue
return newAnnotations
}
func verifyAndAddNodeId(
k8sNodeName string,
k8sNodesClient corev1.NodeInterface,
csiDriverName string,
csiDriverNodeId string) error {
// Add or update annotation on Node object
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{})
if getErr != nil {
glog.Errorf("Failed to get latest version of Node: %v", getErr)
return getErr // do not wrap error
}
var previousAnnotationValue string
if result.ObjectMeta.Annotations != nil {
previousAnnotationValue =
result.ObjectMeta.Annotations[annotationKey]
glog.V(3).Infof(
"previousAnnotationValue=%q", previousAnnotationValue)
}
existingDriverMap := map[string]string{}
if previousAnnotationValue != "" {
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKey,
previousAnnotationValue,
err)
}
}
if val, ok := existingDriverMap[csiDriverName]; ok {
if val == csiDriverNodeId {
// Value already exists in node annotation, nothing more to do
glog.V(2).Infof(
"The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v",
csiDriverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
return nil
}
}
// Add/update annotation value
existingDriverMap[csiDriverName] = csiDriverNodeId
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return fmt.Errorf(
"failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v",
csiDriverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
}
result.ObjectMeta.Annotations = cloneAndAddAnnotation(
result.ObjectMeta.Annotations,
annotationKey,
string(jsonObj))
_, updateErr := k8sNodesClient.Update(result)
if updateErr == nil {
glog.V(2).Infof(
"Updated node %q successfully for CSI driver %q and CSI node name %q",
k8sNodeName,
csiDriverName,
csiDriverNodeId)
}
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("node update failed: %v", retryErr)
}
return nil
}
// Fetches Kubernetes node API object corresponding to k8sNodeName.
// If the csiDriverName is present in the node annotation, it is removed.
func verifyAndDeleteNodeId(
k8sNodeName string,
k8sNodesClient corev1.NodeInterface,
csiDriverName string) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{})
if getErr != nil {
glog.Errorf("failed to get latest version of Node: %v", getErr)
return getErr // do not wrap error
}
var previousAnnotationValue string
if result.ObjectMeta.Annotations != nil {
previousAnnotationValue =
result.ObjectMeta.Annotations[annotationKey]
glog.V(3).Infof(
"previousAnnotationValue=%q", previousAnnotationValue)
}
existingDriverMap := map[string]string{}
if previousAnnotationValue == "" {
// Value already exists in node annotation, nothing more to do
glog.V(2).Infof(
"The key %q does not exist in node %q annotation, no need to cleanup.",
csiDriverName,
annotationKey)
return nil
}
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKey,
previousAnnotationValue,
err)
}
if _, ok := existingDriverMap[csiDriverName]; !ok {
// Value already exists in node annotation, nothing more to do
glog.V(2).Infof(
"The key %q does not eixst in node %q annotation, no need to cleanup: %v",
csiDriverName,
annotationKey,
previousAnnotationValue)
return nil
}
// Add/update annotation value
delete(existingDriverMap, csiDriverName)
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return fmt.Errorf(
"failed while trying to remove key %q from node %q annotation. Existing data: %v",
csiDriverName,
annotationKey,
previousAnnotationValue)
}
result.ObjectMeta.Annotations = cloneAndAddAnnotation(
result.ObjectMeta.Annotations,
annotationKey,
string(jsonObj))
_, updateErr := k8sNodesClient.Update(result)
if updateErr == nil {
fmt.Printf(
"Updated node %q annotation to remove CSI driver %q.",
k8sNodeName,
csiDriverName)
}
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("node update failed: %v", retryErr)
}
return nil
}

View File

@ -2,10 +2,13 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["labelmanager.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/labelmanager",
srcs = ["nodeupdater.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/nodeupdater",
visibility = ["//visibility:public"],
deps = [
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -0,0 +1,193 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package nodeupdater includes internal functions used to add/delete labels to
// kubernetes nodes for corresponding CSI drivers
package nodeupdater // import "k8s.io/kubernetes/pkg/volume/csi/nodeupdater"
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/volume/util"
)
const (
// Name of node annotation that contains JSON map of driver names to node
// names
annotationKey = "csi.volume.kubernetes.io/nodeid"
)
// labelManagementStruct is struct of channels used for communication between the driver registration
// code and the go routine responsible for managing the node's labels
type nodeUpdateStruct struct {
nodeName types.NodeName
k8s kubernetes.Interface
}
// Interface implements an interface for managing labels of a node
type Interface interface {
AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error
}
// NewNodeupdater initializes nodeUpdateStruct and returns available interfaces
func NewNodeUpdater(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface {
return nodeUpdateStruct{
nodeName: nodeName,
k8s: kubeClient,
}
}
// AddLabelsAndLimits nodeUpdater waits for labeling requests initiated by the driver's registration
// process and updates labels and attach limits
func (nodeUpdater nodeUpdateStruct) AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error {
err := addLabelsAndLimits(string(nodeUpdater.nodeName), nodeUpdater.k8s.CoreV1().Nodes(), driverName, driverNodeId, maxLimit)
if err != nil {
return err
}
return nil
}
func addMaxAttachLimitToNode(node *v1.Node, driverName string, maxLimit int64) *v1.Node {
if maxLimit <= 0 {
glog.V(4).Infof("skipping adding attach limit for %s", driverName)
return node
}
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
limitKeyName := util.GetCSIAttachLimitKey(driverName)
node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
return node
}
// Clones the given map and returns a new map with the given key and value added.
// Returns the given map, if annotationKey is empty.
func cloneAndAddAnnotation(
annotations map[string]string,
annotationKey,
annotationValue string) map[string]string {
if annotationKey == "" {
// Don't need to add an annotation.
return annotations
}
// Clone.
newAnnotations := map[string]string{}
for key, value := range annotations {
newAnnotations[key] = value
}
newAnnotations[annotationKey] = annotationValue
return newAnnotations
}
func addNodeIdToNode(node *v1.Node, driverName string, csiDriverNodeId string) (*v1.Node, error) {
var previousAnnotationValue string
if node.ObjectMeta.Annotations != nil {
previousAnnotationValue =
node.ObjectMeta.Annotations[annotationKey]
glog.V(3).Infof(
"previousAnnotationValue=%q", previousAnnotationValue)
}
existingDriverMap := map[string]string{}
if previousAnnotationValue != "" {
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return node, fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKey,
previousAnnotationValue,
err)
}
}
if val, ok := existingDriverMap[driverName]; ok {
if val == csiDriverNodeId {
// Value already exists in node annotation, nothing more to do
glog.V(2).Infof(
"The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v",
driverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
return node, nil
}
}
// Add/update annotation value
existingDriverMap[driverName] = csiDriverNodeId
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return node, fmt.Errorf(
"failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v",
driverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
}
node.ObjectMeta.Annotations = cloneAndAddAnnotation(
node.ObjectMeta.Annotations,
annotationKey,
string(jsonObj))
return node, nil
}
func addLabelsAndLimits(nodeName string, nodeClient corev1.NodeInterface, driverName string, csiDriverNodeId string, maxLimit int64) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
node, getErr := nodeClient.Get(nodeName, metav1.GetOptions{})
if getErr != nil {
glog.Errorf("Failed to get latest version of Node: %v", getErr)
return getErr // do not wrap error
}
var labelErr error
node, labelErr = addNodeIdToNode(node, driverName, csiDriverNodeId)
if labelErr != nil {
return labelErr
}
node = addMaxAttachLimitToNode(node, driverName, maxLimit)
_, updateErr := nodeClient.Update(node)
if updateErr == nil {
glog.V(2).Infof(
"Updated node %q successfully for CSI driver %q and CSI node name %q",
nodeName,
driverName,
csiDriverNodeId)
}
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("error setting attach limit and labels for %s with : %v", driverName, retryErr)
}
return nil
}

View File

@ -49,6 +49,7 @@ go_test(
name = "go_default_test",
srcs = [
"atomic_writer_test.go",
"attach_limit_test.go",
"device_util_linux_test.go",
"nested_volumes_test.go",
"resize_util_test.go",
@ -57,6 +58,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/core/install:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/slice:go_default_library",

View File

@ -16,6 +16,11 @@ limitations under the License.
package util
import (
"crypto/sha1"
"encoding/hex"
)
// This file is a common place holder for volume limit utility constants
// shared between volume package and scheduler
@ -26,4 +31,25 @@ const (
AzureVolumeLimitKey = "attachable-volumes-azure-disk"
// GCEVolumeLimitKey stores resource name that will store volume limits for GCE node
GCEVolumeLimitKey = "attachable-volumes-gce-pd"
// CSIAttachLimitPrefix defines prefix used for CSI volumes
CSIAttachLimitPrefix = "attachable-volumes-csi-"
// ResourceNameLengthLimit stores maximum allowed Length for a ResourceName
ResourceNameLengthLimit = 63
)
// GetCSIAttachLimitKey returns limit key used for CSI volumes
func GetCSIAttachLimitKey(driverName string) string {
csiPrefixLength := len(CSIAttachLimitPrefix)
totalkeyLength := csiPrefixLength + len(driverName)
if totalkeyLength >= ResourceNameLengthLimit {
charsFromDriverName := driverName[:23]
hash := sha1.New()
hash.Write([]byte(driverName))
hashed := hex.EncodeToString(hash.Sum(nil))
hashed = hashed[:16]
return CSIAttachLimitPrefix + charsFromDriverName + hashed
}
return CSIAttachLimitPrefix + driverName
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"crypto/sha1"
"encoding/hex"
"testing"
"k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
func TestGetCSIAttachLimitKey(t *testing.T) {
// When driverName is less than 39 characters
csiLimitKey := GetCSIAttachLimitKey("com.amazon.ebs")
if csiLimitKey != "attachable-volumes-csi-com.amazon.ebs" {
t.Errorf("Expected com.amazon.ebs got %s", csiLimitKey)
}
// When driver is longer than 39 chars
longDriverName := "com.amazon.kubernetes.eks.ec2.ebs/csi-driver"
csiLimitKeyLonger := GetCSIAttachLimitKey(longDriverName)
if !v1helper.IsAttachableVolumeResourceName(v1.ResourceName(csiLimitKeyLonger)) {
t.Errorf("Expected %s to have attachable prefix", csiLimitKeyLonger)
}
expectedCSIKey := getDriverHash(longDriverName)
if csiLimitKeyLonger != expectedCSIKey {
t.Errorf("Expected limit to be %s got %s", expectedCSIKey, csiLimitKeyLonger)
}
}
func getDriverHash(driverName string) string {
charsFromDriverName := driverName[:23]
hash := sha1.New()
hash.Write([]byte(driverName))
hashed := hex.EncodeToString(hash.Sum(nil))
hashed = hashed[:16]
return CSIAttachLimitPrefix + charsFromDriverName + hashed
}

View File

@ -140,6 +140,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"GeneralPredicates",
"MatchInterPodAffinity",
"MaxAzureDiskVolumeCount",
"MaxCSIVolumeCountPred",
"MaxEBSVolumeCount",
"MaxGCEPDVolumeCount",
"NoDiskConflict",