diff --git a/pkg/apis/core/v1/helper/helpers.go b/pkg/apis/core/v1/helper/helpers.go index c8dbbdce92b..4b1da9d9204 100644 --- a/pkg/apis/core/v1/helper/helpers.go +++ b/pkg/apis/core/v1/helper/helpers.go @@ -283,6 +283,20 @@ func NodeSelectorRequirementsAsFieldSelector(nsm []v1.NodeSelectorRequirement) ( return fields.AndSelectors(selectors...), nil } +// NodeSelectorRequirementKeysExistInNodeSelectorTerms checks if a NodeSelectorTerm with key is already specified in terms +func NodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []v1.NodeSelectorRequirement, terms []v1.NodeSelectorTerm) bool { + for _, req := range reqs { + for _, term := range terms { + for _, r := range term.MatchExpressions { + if r.Key == req.Key { + return true + } + } + } + } + return false +} + // MatchNodeSelectorTerms checks whether the node labels and fields match node selector terms in ORed; // nil or empty term matches no objects. func MatchNodeSelectorTerms( diff --git a/pkg/apis/core/v1/helper/helpers_test.go b/pkg/apis/core/v1/helper/helpers_test.go index 58992ab0a04..e914d273339 100644 --- a/pkg/apis/core/v1/helper/helpers_test.go +++ b/pkg/apis/core/v1/helper/helpers_test.go @@ -968,3 +968,193 @@ func TestMatchTopologySelectorTerms(t *testing.T) { }) } } + +func TestNodeSelectorRequirementKeyExistsInNodeSelectorTerms(t *testing.T) { + tests := []struct { + name string + reqs []v1.NodeSelectorRequirement + terms []v1.NodeSelectorTerm + exists bool + }{ + { + name: "empty set of keys in empty set of terms", + reqs: []v1.NodeSelectorRequirement{}, + terms: []v1.NodeSelectorTerm{}, + exists: false, + }, + { + name: "key existence in terms with all keys specified", + reqs: []v1.NodeSelectorRequirement{ + { + Key: "key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value1"}, + }, + { + Key: "key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value2"}, + }, + }, + terms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value2"}, + }, + { + Key: "key3", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value3"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value11, test-value12"}, + }, + { + Key: "key4", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value41, test-value42"}, + }, + }, + }, + }, + exists: true, + }, + { + name: "key existence in terms with one of the keys specfied", + reqs: []v1.NodeSelectorRequirement{ + { + Key: "key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value1"}, + }, + { + Key: "key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value2"}, + }, + { + Key: "key3", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value3"}, + }, + { + Key: "key6", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value6"}, + }, + }, + terms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value2"}, + }, { + Key: "key4", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value4"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "key5", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value5"}, + }, + }, + }, + }, + exists: true, + }, + { + name: "key existence in terms without any of the keys specified", + reqs: []v1.NodeSelectorRequirement{ + { + Key: "key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value2"}, + }, + { + Key: "key3", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value3"}, + }, + }, + terms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "key4", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value"}, + }, + { + Key: "key5", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "key6", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "key7", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value"}, + }, + { + Key: "key8", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value"}, + }, + }, + }, + }, + exists: false, + }, + { + name: "key existence in empty set of terms", + reqs: []v1.NodeSelectorRequirement{ + { + Key: "key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value2"}, + }, + { + Key: "key3", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value3"}, + }, + }, + terms: []v1.NodeSelectorTerm{}, + exists: false, + }, + } + for _, test := range tests { + keyExists := NodeSelectorRequirementKeysExistInNodeSelectorTerms(test.reqs, test.terms) + if test.exists != keyExists { + t.Errorf("test %s failed. Expected %v but got %v", test.name, test.exists, keyExists) + } + } +} diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD index dda12fa969f..be4367841ed 100644 --- a/pkg/controller/cloud/BUILD +++ b/pkg/controller/cloud/BUILD @@ -15,12 +15,15 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/cloud", deps = [ "//pkg/api/v1/node:go_default_library", + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/util/node:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/util/node:go_default_library", + "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -30,6 +33,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", @@ -57,11 +61,13 @@ go_test( "//pkg/controller/testutil:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/scheduler/algorithm:go_default_library", + "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", diff --git a/pkg/controller/cloud/pvlcontroller.go b/pkg/controller/cloud/pvlcontroller.go index aaf34a0e020..355e5d27de2 100644 --- a/pkg/controller/cloud/pvlcontroller.go +++ b/pkg/controller/cloud/pvlcontroller.go @@ -34,6 +34,11 @@ import ( "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + utilfeature "k8s.io/apiserver/pkg/util/feature" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/features" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + volumeutil "k8s.io/kubernetes/pkg/volume/util" "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" @@ -70,7 +75,7 @@ func NewPersistentVolumeLabelController( kubeClient: kubeClient, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"), } - pvlc.syncHandler = pvlc.addLabels + pvlc.syncHandler = pvlc.addLabelsAndAffinity pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -166,7 +171,7 @@ func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool { // AddLabels adds appropriate labels to persistent volumes and sets the // volume as available if successful. -func (pvlc *PersistentVolumeLabelController) addLabels(key string) error { +func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinity(key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err) @@ -178,10 +183,10 @@ func (pvlc *PersistentVolumeLabelController) addLabels(key string) error { return fmt.Errorf("error getting volume %s from informer: %v", name, err) } - return pvlc.addLabelsToVolume(volume) + return pvlc.addLabelsAndAffinityToVolume(volume) } -func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.PersistentVolume) error { +func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinityToVolume(vol *v1.PersistentVolume) error { var volumeLabels map[string]string // Only add labels if the next pending initializer. if needsInitialization(vol.Initializers, initializerName) { @@ -202,11 +207,52 @@ func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.Persisten func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) { volName := vol.Name newVolume := vol.DeepCopyObject().(*v1.PersistentVolume) + populateAffinity := utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) && len(volLabels) != 0 + if newVolume.Labels == nil { newVolume.Labels = make(map[string]string) } + + requirements := make([]v1.NodeSelectorRequirement, 0) for k, v := range volLabels { newVolume.Labels[k] = v + // Set NodeSelectorRequirements based on the labels + if populateAffinity { + var values []string + if k == kubeletapis.LabelZoneFailureDomain { + zones, err := volumeutil.LabelZonesToSet(v) + if err != nil { + return nil, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v) + } + values = zones.List() + } else { + values = []string{v} + } + requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values}) + } + } + if populateAffinity { + if newVolume.Spec.NodeAffinity == nil { + newVolume.Spec.NodeAffinity = new(v1.VolumeNodeAffinity) + } + if newVolume.Spec.NodeAffinity.Required == nil { + newVolume.Spec.NodeAffinity.Required = new(v1.NodeSelector) + } + if len(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 { + // Need atleast one term pre-allocated whose MatchExpressions can be appended to + newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1) + } + // Populate NodeAffinity with requirements if there are no conflicting keys found + if v1helper.NodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) { + glog.V(4).Info("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.", + requirements, newVolume.Spec.NodeAffinity) + } else { + for _, req := range requirements { + for i := range newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms { + newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req) + } + } + } } newVolume.Initializers = removeInitializer(newVolume.Initializers, initializerName) glog.V(4).Infof("removed initializer on PersistentVolume %s", newVolume.Name) diff --git a/pkg/controller/cloud/pvlcontroller_test.go b/pkg/controller/cloud/pvlcontroller_test.go index 2ab2f11394d..b5fa31a284a 100644 --- a/pkg/controller/cloud/pvlcontroller_test.go +++ b/pkg/controller/cloud/pvlcontroller_test.go @@ -18,6 +18,7 @@ package cloud import ( "encoding/json" + "reflect" "testing" "time" @@ -26,8 +27,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + volumeutil "k8s.io/kubernetes/pkg/volume/util" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" ) @@ -71,29 +75,292 @@ func TestCreatePatch(t *testing.T) { }, }, } - - testCases := map[string]struct { - vol v1.PersistentVolume - labels map[string]string - }{ - "non-cloud PV": { - vol: ignoredPV, - labels: nil, + expectedAffinitya1b2MergedWithAWSPV := v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "a", + Operator: v1.NodeSelectorOpIn, + Values: []string{"1"}, + }, + { + Key: "b", + Operator: v1.NodeSelectorOpIn, + Values: []string{"2"}, + }, + }, + }, + }, }, - "no labels": { - vol: awsPV, - labels: nil, + } + expectedAffinityZone1MergedWithAWSPV := v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelZoneFailureDomain, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1"}, + }, + }, + }, + }, }, - "cloudprovider returns nil, nil": { - vol: awsPV, - labels: nil, + } + expectedAffinityZonesMergedWithAWSPV := v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelZoneFailureDomain, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1", "2", "3"}, + }, + }, + }, + }, }, - "cloudprovider labels": { - vol: awsPV, - labels: map[string]string{"a": "1", "b": "2"}, + } + awsPVWithAffinity := v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "awsPV", + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{ + { + Name: initializerName, + }, + }, + }, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: "123", + }, + }, + NodeAffinity: &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "c", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val1", "val2"}, + }, + { + Key: "d", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val3"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "e", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val4", "val5"}, + }, + }, + }, + }, + }, + }, + }, + } + expectedAffinitya1b2MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "c", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val1", "val2"}, + }, + { + Key: "d", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val3"}, + }, + { + Key: "a", + Operator: v1.NodeSelectorOpIn, + Values: []string{"1"}, + }, + { + Key: "b", + Operator: v1.NodeSelectorOpIn, + Values: []string{"2"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "e", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val4", "val5"}, + }, + { + Key: "a", + Operator: v1.NodeSelectorOpIn, + Values: []string{"1"}, + }, + { + Key: "b", + Operator: v1.NodeSelectorOpIn, + Values: []string{"2"}, + }, + }, + }, + }, + }, + } + expectedAffinityZone1MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "c", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val1", "val2"}, + }, + { + Key: "d", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val3"}, + }, + { + Key: kubeletapis.LabelZoneFailureDomain, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "e", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val4", "val5"}, + }, + { + Key: kubeletapis.LabelZoneFailureDomain, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1"}, + }, + }, + }, + }, + }, + } + expectedAffinityZonesMergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "c", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val1", "val2"}, + }, + { + Key: "d", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val3"}, + }, + { + Key: kubeletapis.LabelZoneFailureDomain, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1", "2", "3"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "e", + Operator: v1.NodeSelectorOpIn, + Values: []string{"val4", "val5"}, + }, + { + Key: kubeletapis.LabelZoneFailureDomain, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1", "2", "3"}, + }, + }, + }, + }, }, } + zones, _ := volumeutil.ZonesToSet("1,2,3") + testCases := map[string]struct { + vol v1.PersistentVolume + labels map[string]string + expectedAffinity *v1.VolumeNodeAffinity + }{ + "non-cloud PV": { + vol: ignoredPV, + labels: nil, + expectedAffinity: nil, + }, + "no labels": { + vol: awsPV, + labels: nil, + expectedAffinity: nil, + }, + "cloudprovider returns nil, nil": { + vol: awsPV, + labels: nil, + expectedAffinity: nil, + }, + "cloudprovider labels": { + vol: awsPV, + labels: map[string]string{"a": "1", "b": "2"}, + expectedAffinity: &expectedAffinitya1b2MergedWithAWSPV, + }, + "cloudprovider labels pre-existing affinity non-conflicting": { + vol: awsPVWithAffinity, + labels: map[string]string{"a": "1", "b": "2"}, + expectedAffinity: &expectedAffinitya1b2MergedWithAWSPVWithAffinity, + }, + "cloudprovider labels pre-existing affinity conflicting": { + vol: awsPVWithAffinity, + labels: map[string]string{"a": "1", "c": "2"}, + expectedAffinity: nil, + }, + "cloudprovider singlezone": { + vol: awsPV, + labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "1"}, + expectedAffinity: &expectedAffinityZone1MergedWithAWSPV, + }, + "cloudprovider singlezone pre-existing affinity non-conflicting": { + vol: awsPVWithAffinity, + labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "1"}, + expectedAffinity: &expectedAffinityZone1MergedWithAWSPVWithAffinity, + }, + "cloudprovider multizone": { + vol: awsPV, + labels: map[string]string{kubeletapis.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)}, + expectedAffinity: &expectedAffinityZonesMergedWithAWSPV, + }, + "cloudprovider multizone pre-existing affinity non-conflicting": { + vol: awsPVWithAffinity, + labels: map[string]string{kubeletapis.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)}, + expectedAffinity: &expectedAffinityZonesMergedWithAWSPVWithAffinity, + }, + } + + utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true") + defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false") for d, tc := range testCases { cloud := &fakecloud.FakeCloud{} client := fake.NewSimpleClientset() @@ -104,16 +371,20 @@ func TestCreatePatch(t *testing.T) { } obj := &v1.PersistentVolume{} json.Unmarshal(patch, obj) - if tc.labels != nil { - for k, v := range tc.labels { - if obj.ObjectMeta.Labels[k] != v { - t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k]) - } - } - } if obj.ObjectMeta.Initializers != nil { t.Errorf("%s: initializer wasn't removed: %v", d, obj.ObjectMeta.Initializers) } + if tc.labels == nil { + continue + } + for k, v := range tc.labels { + if obj.ObjectMeta.Labels[k] != v { + t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k]) + } + } + if !reflect.DeepEqual(tc.expectedAffinity, obj.Spec.NodeAffinity) { + t.Errorf("Expected affinity %v does not match target affinity %v", tc.expectedAffinity, obj.Spec.NodeAffinity) + } } } @@ -132,32 +403,35 @@ func TestAddLabelsToVolume(t *testing.T) { } testCases := map[string]struct { - vol v1.PersistentVolume - initializers *metav1.Initializers - shouldLabel bool + vol v1.PersistentVolume + initializers *metav1.Initializers + shouldLabelAndSetAffinity bool }{ "PV without initializer": { - vol: pv, - initializers: nil, - shouldLabel: false, + vol: pv, + initializers: nil, + shouldLabelAndSetAffinity: false, }, "PV with initializer to remove": { - vol: pv, - initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: initializerName}}}, - shouldLabel: true, + vol: pv, + initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: initializerName}}}, + shouldLabelAndSetAffinity: true, }, "PV with other initializers only": { - vol: pv, - initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}}}, - shouldLabel: false, + vol: pv, + initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}}}, + shouldLabelAndSetAffinity: false, }, "PV with other initializers first": { - vol: pv, - initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}, {Name: initializerName}}}, - shouldLabel: false, + vol: pv, + initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}, {Name: initializerName}}}, + shouldLabelAndSetAffinity: false, }, } + utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true") + defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false") + for d, tc := range testCases { labeledCh := make(chan bool, 1) client := fake.NewSimpleClientset() @@ -168,6 +442,22 @@ func TestAddLabelsToVolume(t *testing.T) { if obj.ObjectMeta.Labels["a"] != "1" { return false, nil, nil } + if obj.Spec.NodeAffinity == nil { + return false, nil, nil + } + if obj.Spec.NodeAffinity.Required == nil { + return false, nil, nil + } + if len(obj.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 { + return false, nil, nil + } + reqs := obj.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions + if len(reqs) != 1 { + return false, nil, nil + } + if reqs[0].Key != "a" || reqs[0].Values[0] != "1" || reqs[0].Operator != v1.NodeSelectorOpIn { + return false, nil, nil + } labeledCh <- true return true, nil, nil }) @@ -177,16 +467,16 @@ func TestAddLabelsToVolume(t *testing.T) { } pvlController := &PersistentVolumeLabelController{kubeClient: client, cloud: fakeCloud} tc.vol.ObjectMeta.Initializers = tc.initializers - pvlController.addLabelsToVolume(&tc.vol) + pvlController.addLabelsAndAffinityToVolume(&tc.vol) select { case l := <-labeledCh: - if l != tc.shouldLabel { - t.Errorf("%s: label of pv failed. expected %t got %t", d, tc.shouldLabel, l) + if l != tc.shouldLabelAndSetAffinity { + t.Errorf("%s: label and affinity setting of pv failed. expected %t got %t", d, tc.shouldLabelAndSetAffinity, l) } case <-time.After(500 * time.Millisecond): - if tc.shouldLabel != false { - t.Errorf("%s: timed out waiting for label notification", d) + if tc.shouldLabelAndSetAffinity != false { + t.Errorf("%s: timed out waiting for label and affinity setting notification", d) } } } diff --git a/plugin/pkg/admission/storage/persistentvolume/label/BUILD b/plugin/pkg/admission/storage/persistentvolume/label/BUILD index 7fe77fc9d93..2674d29ebd0 100644 --- a/plugin/pkg/admission/storage/persistentvolume/label/BUILD +++ b/plugin/pkg/admission/storage/persistentvolume/label/BUILD @@ -18,10 +18,13 @@ go_library( "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/aws:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) @@ -33,10 +36,13 @@ go_test( deps = [ "//pkg/apis/core:go_default_library", "//pkg/cloudprovider/providers/aws:go_default_library", + "//pkg/kubelet/apis:go_default_library", + "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/plugin/pkg/admission/storage/persistentvolume/label/admission.go b/plugin/pkg/admission/storage/persistentvolume/label/admission.go index 19fc962ca32..09c70df8882 100644 --- a/plugin/pkg/admission/storage/persistentvolume/label/admission.go +++ b/plugin/pkg/admission/storage/persistentvolume/label/admission.go @@ -24,13 +24,16 @@ import ( "github.com/golang/glog" "k8s.io/apiserver/pkg/admission" + utilfeature "k8s.io/apiserver/pkg/util/feature" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "k8s.io/kubernetes/pkg/features" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" vol "k8s.io/kubernetes/pkg/volume" + volumeutil "k8s.io/kubernetes/pkg/volume/util" ) const ( @@ -79,6 +82,19 @@ func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) { l.cloudConfig = cloudConfig } +func nodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []api.NodeSelectorRequirement, terms []api.NodeSelectorTerm) bool { + for _, req := range reqs { + for _, term := range terms { + for _, r := range term.MatchExpressions { + if r.Key == req.Key { + return true + } + } + } + } + return false +} + func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) { if a.GetResource().GroupResource() != api.Resource("persistentvolumes") { return nil @@ -108,6 +124,7 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) { volumeLabels = labels } + requirements := make([]api.NodeSelectorRequirement, 0) if len(volumeLabels) != 0 { if volume.Labels == nil { volume.Labels = make(map[string]string) @@ -117,6 +134,42 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) { // This should be OK because they are in the kubernetes.io namespace // i.e. we own them volume.Labels[k] = v + + // Set NodeSelectorRequirements based on the labels + var values []string + if k == kubeletapis.LabelZoneFailureDomain { + zones, err := volumeutil.LabelZonesToSet(v) + if err != nil { + return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v)) + } + values = zones.UnsortedList() + } else { + values = []string{v} + } + requirements = append(requirements, api.NodeSelectorRequirement{Key: k, Operator: api.NodeSelectorOpIn, Values: values}) + } + + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + if volume.Spec.NodeAffinity == nil { + volume.Spec.NodeAffinity = new(api.VolumeNodeAffinity) + } + if volume.Spec.NodeAffinity.Required == nil { + volume.Spec.NodeAffinity.Required = new(api.NodeSelector) + } + if len(volume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 { + // Need atleast one term pre-allocated whose MatchExpressions can be appended to + volume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]api.NodeSelectorTerm, 1) + } + if nodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, volume.Spec.NodeAffinity.Required.NodeSelectorTerms) { + glog.V(4).Info("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.", + requirements, volume.Spec.NodeAffinity) + } else { + for _, req := range requirements { + for i := range volume.Spec.NodeAffinity.Required.NodeSelectorTerms { + volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req) + } + } + } } } diff --git a/plugin/pkg/admission/storage/persistentvolume/label/admission_test.go b/plugin/pkg/admission/storage/persistentvolume/label/admission_test.go index f04939152e2..965e9a3a3e9 100644 --- a/plugin/pkg/admission/storage/persistentvolume/label/admission_test.go +++ b/plugin/pkg/admission/storage/persistentvolume/label/admission_test.go @@ -20,13 +20,18 @@ import ( "testing" "fmt" + "reflect" + "sort" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/admission" + utilfeature "k8s.io/apiserver/pkg/util/feature" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + volumeutil "k8s.io/kubernetes/pkg/volume/util" ) type mockVolumes struct { @@ -83,6 +88,16 @@ func mockVolumeLabels(labels map[string]string) *mockVolumes { return &mockVolumes{volumeLabels: labels} } +func getNodeSelectorRequirementWithKey(key string, term api.NodeSelectorTerm) (*api.NodeSelectorRequirement, error) { + for _, r := range term.MatchExpressions { + if r.Key != key { + continue + } + return &r, nil + } + return nil, fmt.Errorf("key %s not found", key) +} + // TestAdmission func TestAdmission(t *testing.T) { pvHandler := newPersistentVolumeLabel() @@ -107,6 +122,8 @@ func TestAdmission(t *testing.T) { }, }, } + utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true") + defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false") // Non-cloud PVs are ignored err := handler.Admit(admission.NewAttributesRecord(&ignoredPV, nil, api.Kind("PersistentVolume").WithVersion("version"), ignoredPV.Namespace, ignoredPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil)) @@ -137,6 +154,9 @@ func TestAdmission(t *testing.T) { if len(awsPV.ObjectMeta.Labels) != 0 { t.Errorf("Unexpected number of labels") } + if awsPV.Spec.NodeAffinity != nil { + t.Errorf("Unexpected NodeAffinity found") + } // Don't panic if the cloudprovider returns nil, nil pvHandler.ebsVolumes = mockVolumeFailure(nil) @@ -145,17 +165,45 @@ func TestAdmission(t *testing.T) { t.Errorf("Expected no error when cloud provider returns empty labels") } - // Labels from the cloudprovider should be applied to the volume + // Labels from the cloudprovider should be applied to the volume as labels and node affinity expressions labels = make(map[string]string) labels["a"] = "1" labels["b"] = "2" + zones, _ := volumeutil.ZonesToSet("1,2,3") + labels[kubeletapis.LabelZoneFailureDomain] = volumeutil.ZonesSetToLabelValue(zones) pvHandler.ebsVolumes = mockVolumeLabels(labels) err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil)) if err != nil { t.Errorf("Expected no error when creating aws pv") } if awsPV.Labels["a"] != "1" || awsPV.Labels["b"] != "2" { - t.Errorf("Expected label a to be added when creating aws pv") + t.Errorf("Expected label a and b to be added when creating aws pv") + } + if awsPV.Spec.NodeAffinity == nil { + t.Errorf("Unexpected nil NodeAffinity found") + } + if len(awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms) != 1 { + t.Errorf("Unexpected number of NodeSelectorTerms") + } + term := awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0] + if len(term.MatchExpressions) != 3 { + t.Errorf("Unexpected number of NodeSelectorRequirements in volume NodeAffinity: %d", len(term.MatchExpressions)) + } + r, _ := getNodeSelectorRequirementWithKey("a", term) + if r == nil || r.Values[0] != "1" || r.Operator != api.NodeSelectorOpIn { + t.Errorf("NodeSelectorRequirement a-in-1 not found in volume NodeAffinity") + } + r, _ = getNodeSelectorRequirementWithKey("b", term) + if r == nil || r.Values[0] != "2" || r.Operator != api.NodeSelectorOpIn { + t.Errorf("NodeSelectorRequirement b-in-2 not found in volume NodeAffinity") + } + r, _ = getNodeSelectorRequirementWithKey(kubeletapis.LabelZoneFailureDomain, term) + if r == nil { + t.Errorf("NodeSelectorRequirement %s-in-%v not found in volume NodeAffinity", kubeletapis.LabelZoneFailureDomain, zones) + } + sort.Strings(r.Values) + if !reflect.DeepEqual(r.Values, zones.List()) { + t.Errorf("ZoneFailureDomain elements %v does not match zone labels %v", r.Values, zones) } // User-provided labels should be honored, but cloudprovider labels replace them when they overlap @@ -173,4 +221,50 @@ func TestAdmission(t *testing.T) { t.Errorf("Expected (non-conflicting) user provided labels to be honored when creating aws pv") } + // if a conflicting affinity is already specified, leave affinity in-tact + labels = make(map[string]string) + labels["a"] = "1" + labels["b"] = "2" + labels["c"] = "3" + pvHandler.ebsVolumes = mockVolumeLabels(labels) + err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("Expected no error when creating aws pv") + } + if awsPV.Spec.NodeAffinity == nil { + t.Errorf("Unexpected nil NodeAffinity found") + } + if awsPV.Spec.NodeAffinity.Required == nil { + t.Errorf("Unexpected nil NodeAffinity.Required %v", awsPV.Spec.NodeAffinity.Required) + } + r, _ = getNodeSelectorRequirementWithKey("c", awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0]) + if r != nil { + t.Errorf("NodeSelectorRequirement c not expected in volume NodeAffinity") + } + + // if a non-conflicting affinity is specified, check for new affinity being added + labels = make(map[string]string) + labels["e"] = "1" + labels["f"] = "2" + labels["g"] = "3" + pvHandler.ebsVolumes = mockVolumeLabels(labels) + err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("Expected no error when creating aws pv") + } + if awsPV.Spec.NodeAffinity == nil { + t.Errorf("Unexpected nil NodeAffinity found") + } + if awsPV.Spec.NodeAffinity.Required == nil { + t.Errorf("Unexpected nil NodeAffinity.Required %v", awsPV.Spec.NodeAffinity.Required) + } + // populate old entries + labels["a"] = "1" + labels["b"] = "2" + for k, v := range labels { + r, _ = getNodeSelectorRequirementWithKey(k, awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0]) + if r == nil || r.Values[0] != v || r.Operator != api.NodeSelectorOpIn { + t.Errorf("NodeSelectorRequirement %s-in-%v not found in volume NodeAffinity", k, v) + } + } }