Merge pull request #64226 from ddebroy/ddebroy-affinity1

Automatic merge from submit-queue (batch tested with PRs 64226, 65880). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Populate NodeAffinity on top of labels for cloud based PersistentVolumes

**What this PR does / why we need it**:

This PR populates the NodeAffinity field (on top of the existing labels) for PVs backed by cloud providers like EC2 EBS and GCE PD.

**Special notes for your reviewer**:
Related to https://github.com/kubernetes/kubernetes/pull/63232

Sample `describe pv` output for EBS with node affinity field populated:
```
kubectl describe pv pv0001
Name:              pv0001
Labels:            failure-domain.beta.kubernetes.io/region=us-west-2
                   failure-domain.beta.kubernetes.io/zone=us-west-2a
Annotations:       <none>
Finalizers:        [kubernetes.io/pv-protection]
StorageClass:      
Status:            Available
Claim:             
Reclaim Policy:    Retain
Access Modes:      RWO
Capacity:          5Gi
Node Affinity:     
  Required Terms:  
    Term 0:        failure-domain.beta.kubernetes.io/zone in [us-west-2a]
                   failure-domain.beta.kubernetes.io/region in [us-west-2]
Message:           
Source:
    Type:       AWSElasticBlockStore (a Persistent Disk resource in AWS)
    VolumeID:   vol-00cf03a068c62cbe6
    FSType:     ext4
    Partition:  0
    ReadOnly:   false
Events:         <none>
```

/sig storage
/assign @msau42

**Release note**:
```NONE```
This commit is contained in:
Kubernetes Submit Queue
2018-07-09 12:16:02 -07:00
committed by GitHub
8 changed files with 749 additions and 50 deletions

View File

@@ -18,10 +18,13 @@ go_library(
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
@@ -33,10 +36,13 @@ go_test(
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View File

@@ -24,13 +24,16 @@ import (
"github.com/golang/glog"
"k8s.io/apiserver/pkg/admission"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/features"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
vol "k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
@@ -79,6 +82,19 @@ func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
l.cloudConfig = cloudConfig
}
func nodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []api.NodeSelectorRequirement, terms []api.NodeSelectorTerm) bool {
for _, req := range reqs {
for _, term := range terms {
for _, r := range term.MatchExpressions {
if r.Key == req.Key {
return true
}
}
}
}
return false
}
func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
return nil
@@ -108,6 +124,7 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
volumeLabels = labels
}
requirements := make([]api.NodeSelectorRequirement, 0)
if len(volumeLabels) != 0 {
if volume.Labels == nil {
volume.Labels = make(map[string]string)
@@ -117,6 +134,42 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
// This should be OK because they are in the kubernetes.io namespace
// i.e. we own them
volume.Labels[k] = v
// Set NodeSelectorRequirements based on the labels
var values []string
if k == kubeletapis.LabelZoneFailureDomain {
zones, err := volumeutil.LabelZonesToSet(v)
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v))
}
values = zones.UnsortedList()
} else {
values = []string{v}
}
requirements = append(requirements, api.NodeSelectorRequirement{Key: k, Operator: api.NodeSelectorOpIn, Values: values})
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
if volume.Spec.NodeAffinity == nil {
volume.Spec.NodeAffinity = new(api.VolumeNodeAffinity)
}
if volume.Spec.NodeAffinity.Required == nil {
volume.Spec.NodeAffinity.Required = new(api.NodeSelector)
}
if len(volume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
// Need atleast one term pre-allocated whose MatchExpressions can be appended to
volume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]api.NodeSelectorTerm, 1)
}
if nodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, volume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
glog.V(4).Info("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
requirements, volume.Spec.NodeAffinity)
} else {
for _, req := range requirements {
for i := range volume.Spec.NodeAffinity.Required.NodeSelectorTerms {
volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
}
}
}
}
}

View File

@@ -20,13 +20,18 @@ import (
"testing"
"fmt"
"reflect"
"sort"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
type mockVolumes struct {
@@ -83,6 +88,16 @@ func mockVolumeLabels(labels map[string]string) *mockVolumes {
return &mockVolumes{volumeLabels: labels}
}
func getNodeSelectorRequirementWithKey(key string, term api.NodeSelectorTerm) (*api.NodeSelectorRequirement, error) {
for _, r := range term.MatchExpressions {
if r.Key != key {
continue
}
return &r, nil
}
return nil, fmt.Errorf("key %s not found", key)
}
// TestAdmission
func TestAdmission(t *testing.T) {
pvHandler := newPersistentVolumeLabel()
@@ -107,6 +122,8 @@ func TestAdmission(t *testing.T) {
},
},
}
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
// Non-cloud PVs are ignored
err := handler.Admit(admission.NewAttributesRecord(&ignoredPV, nil, api.Kind("PersistentVolume").WithVersion("version"), ignoredPV.Namespace, ignoredPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil))
@@ -137,6 +154,9 @@ func TestAdmission(t *testing.T) {
if len(awsPV.ObjectMeta.Labels) != 0 {
t.Errorf("Unexpected number of labels")
}
if awsPV.Spec.NodeAffinity != nil {
t.Errorf("Unexpected NodeAffinity found")
}
// Don't panic if the cloudprovider returns nil, nil
pvHandler.ebsVolumes = mockVolumeFailure(nil)
@@ -145,17 +165,45 @@ func TestAdmission(t *testing.T) {
t.Errorf("Expected no error when cloud provider returns empty labels")
}
// Labels from the cloudprovider should be applied to the volume
// Labels from the cloudprovider should be applied to the volume as labels and node affinity expressions
labels = make(map[string]string)
labels["a"] = "1"
labels["b"] = "2"
zones, _ := volumeutil.ZonesToSet("1,2,3")
labels[kubeletapis.LabelZoneFailureDomain] = volumeutil.ZonesSetToLabelValue(zones)
pvHandler.ebsVolumes = mockVolumeLabels(labels)
err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("Expected no error when creating aws pv")
}
if awsPV.Labels["a"] != "1" || awsPV.Labels["b"] != "2" {
t.Errorf("Expected label a to be added when creating aws pv")
t.Errorf("Expected label a and b to be added when creating aws pv")
}
if awsPV.Spec.NodeAffinity == nil {
t.Errorf("Unexpected nil NodeAffinity found")
}
if len(awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms) != 1 {
t.Errorf("Unexpected number of NodeSelectorTerms")
}
term := awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0]
if len(term.MatchExpressions) != 3 {
t.Errorf("Unexpected number of NodeSelectorRequirements in volume NodeAffinity: %d", len(term.MatchExpressions))
}
r, _ := getNodeSelectorRequirementWithKey("a", term)
if r == nil || r.Values[0] != "1" || r.Operator != api.NodeSelectorOpIn {
t.Errorf("NodeSelectorRequirement a-in-1 not found in volume NodeAffinity")
}
r, _ = getNodeSelectorRequirementWithKey("b", term)
if r == nil || r.Values[0] != "2" || r.Operator != api.NodeSelectorOpIn {
t.Errorf("NodeSelectorRequirement b-in-2 not found in volume NodeAffinity")
}
r, _ = getNodeSelectorRequirementWithKey(kubeletapis.LabelZoneFailureDomain, term)
if r == nil {
t.Errorf("NodeSelectorRequirement %s-in-%v not found in volume NodeAffinity", kubeletapis.LabelZoneFailureDomain, zones)
}
sort.Strings(r.Values)
if !reflect.DeepEqual(r.Values, zones.List()) {
t.Errorf("ZoneFailureDomain elements %v does not match zone labels %v", r.Values, zones)
}
// User-provided labels should be honored, but cloudprovider labels replace them when they overlap
@@ -173,4 +221,50 @@ func TestAdmission(t *testing.T) {
t.Errorf("Expected (non-conflicting) user provided labels to be honored when creating aws pv")
}
// if a conflicting affinity is already specified, leave affinity in-tact
labels = make(map[string]string)
labels["a"] = "1"
labels["b"] = "2"
labels["c"] = "3"
pvHandler.ebsVolumes = mockVolumeLabels(labels)
err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("Expected no error when creating aws pv")
}
if awsPV.Spec.NodeAffinity == nil {
t.Errorf("Unexpected nil NodeAffinity found")
}
if awsPV.Spec.NodeAffinity.Required == nil {
t.Errorf("Unexpected nil NodeAffinity.Required %v", awsPV.Spec.NodeAffinity.Required)
}
r, _ = getNodeSelectorRequirementWithKey("c", awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0])
if r != nil {
t.Errorf("NodeSelectorRequirement c not expected in volume NodeAffinity")
}
// if a non-conflicting affinity is specified, check for new affinity being added
labels = make(map[string]string)
labels["e"] = "1"
labels["f"] = "2"
labels["g"] = "3"
pvHandler.ebsVolumes = mockVolumeLabels(labels)
err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("Expected no error when creating aws pv")
}
if awsPV.Spec.NodeAffinity == nil {
t.Errorf("Unexpected nil NodeAffinity found")
}
if awsPV.Spec.NodeAffinity.Required == nil {
t.Errorf("Unexpected nil NodeAffinity.Required %v", awsPV.Spec.NodeAffinity.Required)
}
// populate old entries
labels["a"] = "1"
labels["b"] = "2"
for k, v := range labels {
r, _ = getNodeSelectorRequirementWithKey(k, awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0])
if r == nil || r.Values[0] != v || r.Operator != api.NodeSelectorOpIn {
t.Errorf("NodeSelectorRequirement %s-in-%v not found in volume NodeAffinity", k, v)
}
}
}