diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index afdf3e5122a..26710bbb928 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -273,7 +273,6 @@ func newControllerInitializers() map[string]initFunc { controllers := map[string]initFunc{} controllers["cloud-node"] = startCloudNodeController controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController - controllers["persistentvolume-binder"] = startPersistentVolumeLabelController controllers["service"] = startServiceController controllers["route"] = startRouteController return controllers diff --git a/cmd/cloud-controller-manager/app/core.go b/cmd/cloud-controller-manager/app/core.go index 628237bcb40..1b79fb9a93d 100644 --- a/cmd/cloud-controller-manager/app/core.go +++ b/cmd/cloud-controller-manager/app/core.go @@ -66,17 +66,6 @@ func startCloudNodeLifecycleController(ctx *cloudcontrollerconfig.CompletedConfi return nil, true, nil } -func startPersistentVolumeLabelController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { - // Start the PersistentVolumeLabelController - pvlController := cloudcontrollers.NewPersistentVolumeLabelController( - ctx.ClientBuilder.ClientOrDie("pvl-controller"), - cloud, - ) - go pvlController.Run(5, stopCh) - - return nil, true, nil -} - func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { // Start the service controller serviceController, err := servicecontroller.New( diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD index 327a8ab7411..4ff7b17aad8 100644 --- a/pkg/controller/cloud/BUILD +++ b/pkg/controller/cloud/BUILD @@ -11,26 +11,20 @@ go_library( srcs = [ "node_controller.go", "node_lifecycle_controller.go", - "pvlcontroller.go", ], importpath = "k8s.io/kubernetes/pkg/controller/cloud", deps = [ - "//pkg/apis/core/v1/helper:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/util/node:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", @@ -39,9 +33,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", - "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", - "//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -51,7 +43,6 @@ go_test( srcs = [ "node_controller_test.go", "node_lifecycle_controller_test.go", - "pvlcontroller_test.go", ], embed = [":go_default_library"], deps = [ @@ -62,17 +53,13 @@ go_test( "//pkg/scheduler/api:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", - "//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/controller/cloud/pvlcontroller.go b/pkg/controller/cloud/pvlcontroller.go deleted file mode 100644 index 3bb7dc72a3b..00000000000 --- a/pkg/controller/cloud/pvlcontroller.go +++ /dev/null @@ -1,293 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "k8s.io/klog" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" - cloudprovider "k8s.io/cloud-provider" - volumehelpers "k8s.io/cloud-provider/volume/helpers" - v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" - "k8s.io/kubernetes/pkg/controller" -) - -// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created -type PersistentVolumeLabelController struct { - cloud cloudprovider.Interface - kubeClient kubernetes.Interface - pvlController cache.Controller - pvlIndexer cache.Indexer - volumeLister corelisters.PersistentVolumeLister - - syncHandler func(key string) error - - // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors - queue workqueue.RateLimitingInterface -} - -// NewPersistentVolumeLabelController creates a PersistentVolumeLabelController object -func NewPersistentVolumeLabelController( - kubeClient kubernetes.Interface, - cloud cloudprovider.Interface) *PersistentVolumeLabelController { - - pvlc := &PersistentVolumeLabelController{ - cloud: cloud, - kubeClient: kubeClient, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"), - } - pvlc.syncHandler = pvlc.addLabelsAndAffinity - pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CoreV1().PersistentVolumes().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CoreV1().PersistentVolumes().Watch(options) - }, - }, - &v1.PersistentVolume{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err == nil { - pvlc.queue.Add(key) - } - }, - }, - cache.Indexers{}, - ) - pvlc.volumeLister = corelisters.NewPersistentVolumeLister(pvlc.pvlIndexer) - - return pvlc -} - -// Run starts a controller that adds labels to persistent volumes -func (pvlc *PersistentVolumeLabelController) Run(threadiness int, stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - defer pvlc.queue.ShutDown() - - klog.Infof("Starting PersistentVolumeLabelController") - defer klog.Infof("Shutting down PersistentVolumeLabelController") - - go pvlc.pvlController.Run(stopCh) - - if !controller.WaitForCacheSync("persistent volume label", stopCh, pvlc.pvlController.HasSynced) { - return - } - - // start up your worker threads based on threadiness. Some controllers have multiple kinds of workers - for i := 0; i < threadiness; i++ { - // runWorker will loop until "something bad" happens. The .Until will then rekick the worker - // after one second - go wait.Until(pvlc.runWorker, time.Second, stopCh) - } - - // wait until we're told to stop - <-stopCh -} - -func (pvlc *PersistentVolumeLabelController) runWorker() { - // hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work - // available, so we don't worry about secondary waits - for pvlc.processNextWorkItem() { - } -} - -// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. -func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool { - // pull the next work item from queue. It should be a key we use to lookup something in a cache - keyObj, quit := pvlc.queue.Get() - if quit { - return false - } - // you always have to indicate to the queue that you've completed a piece of work - defer pvlc.queue.Done(keyObj) - - key := keyObj.(string) - // do your work on the key. This method will contains your "do stuff" logic - err := pvlc.syncHandler(key) - if err == nil { - // if you had no error, tell the queue to stop tracking history for your key. This will - // reset things like failure counts for per-item rate limiting - pvlc.queue.Forget(key) - return true - } - - // there was a failure so be sure to report it. This method allows for pluggable error handling - // which can be used for things like cluster-monitoring - utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) - - // since we failed, we should requeue the item to work on later. This method will add a backoff - // to avoid hotlooping on particular items (they're probably still not going to work right away) - // and overall controller protection (everything I've done is broken, this controller needs to - // calm down or it can starve other useful work) cases. - pvlc.queue.AddRateLimited(key) - - return true -} - -// AddLabels adds appropriate labels to persistent volumes and sets the -// volume as available if successful. -func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinity(key string) error { - _, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err) - } - volume, err := pvlc.volumeLister.Get(name) - if errors.IsNotFound(err) { - return nil - } else if err != nil { - return fmt.Errorf("error getting volume %s from informer: %v", name, err) - } - - return pvlc.addLabelsAndAffinityToVolume(volume) -} - -func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinityToVolume(vol *v1.PersistentVolume) error { - var volumeLabels map[string]string - // Only add labels if the next pending initializer. - if needsInitialization(vol) { - if labeler, ok := (pvlc.cloud).(cloudprovider.PVLabeler); ok { - labels, err := labeler.GetLabelsForVolume(context.TODO(), vol) - if err != nil { - return fmt.Errorf("error querying volume %v: %v", vol.Spec, err) - } - volumeLabels = labels - } else { - klog.V(4).Info("cloud provider does not support PVLabeler") - } - return pvlc.updateVolume(vol, volumeLabels) - } - return nil -} - -func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) { - volName := vol.Name - newVolume := vol.DeepCopyObject().(*v1.PersistentVolume) - populateAffinity := len(volLabels) != 0 - - if newVolume.Labels == nil { - newVolume.Labels = make(map[string]string) - } - - requirements := make([]v1.NodeSelectorRequirement, 0) - for k, v := range volLabels { - newVolume.Labels[k] = v - // Set NodeSelectorRequirements based on the labels - if populateAffinity { - var values []string - if k == v1.LabelZoneFailureDomain { - zones, err := volumehelpers.LabelZonesToSet(v) - if err != nil { - return nil, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v) - } - values = zones.List() - } else { - values = []string{v} - } - requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values}) - } - } - if populateAffinity { - if newVolume.Spec.NodeAffinity == nil { - newVolume.Spec.NodeAffinity = new(v1.VolumeNodeAffinity) - } - if newVolume.Spec.NodeAffinity.Required == nil { - newVolume.Spec.NodeAffinity.Required = new(v1.NodeSelector) - } - if len(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 { - // Need at least one term pre-allocated whose MatchExpressions can be appended to - newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1) - } - // Populate NodeAffinity with requirements if there are no conflicting keys found - if v1helper.NodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) { - klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.", - requirements, newVolume.Spec.NodeAffinity) - } else { - for _, req := range requirements { - for i := range newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms { - newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req) - } - } - } - } - markInitialized(newVolume) - klog.V(4).Infof("marked PersistentVolume %s initialized", newVolume.Name) - - oldData, err := json.Marshal(vol) - if err != nil { - return nil, fmt.Errorf("failed to marshal old persistentvolume %#v for persistentvolume %q: %v", vol, volName, err) - } - - newData, err := json.Marshal(newVolume) - if err != nil { - return nil, fmt.Errorf("failed to marshal new persistentvolume %#v for persistentvolume %q: %v", newVolume, volName, err) - } - - patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.PersistentVolume{}) - if err != nil { - return nil, fmt.Errorf("failed to create patch for persistentvolume %q: %v", volName, err) - } - return patch, nil -} - -func (pvlc *PersistentVolumeLabelController) updateVolume(vol *v1.PersistentVolume, volLabels map[string]string) error { - volName := vol.Name - klog.V(4).Infof("updating PersistentVolume %s", volName) - patchBytes, err := pvlc.createPatch(vol, volLabels) - if err != nil { - return err - } - - _, err = pvlc.kubeClient.CoreV1().PersistentVolumes().Patch(string(volName), types.StrategicMergePatchType, patchBytes) - if err != nil { - return fmt.Errorf("failed to update PersistentVolume %s: %v", volName, err) - } - klog.V(4).Infof("updated PersistentVolume %s", volName) - - return nil -} - -func markInitialized(vol *v1.PersistentVolume) { - // TODO: mark initialized using a different field, since initializers are not being promoted past alpha, or convert to an admission plugin -} - -// needsInitialization checks whether or not the PVL is the next pending initializer. -func needsInitialization(vol *v1.PersistentVolume) bool { - // TODO: determine whether initialization is required based on a different attribute, - // since initializers are not being promoted past alpha, or convert to an admission plugin - return false -} diff --git a/pkg/controller/cloud/pvlcontroller_test.go b/pkg/controller/cloud/pvlcontroller_test.go deleted file mode 100644 index 6be54477033..00000000000 --- a/pkg/controller/cloud/pvlcontroller_test.go +++ /dev/null @@ -1,542 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "encoding/json" - "testing" - "time" - - "k8s.io/api/core/v1" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - - sets "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/kubernetes/fake" - core "k8s.io/client-go/testing" - volumehelpers "k8s.io/cloud-provider/volume/helpers" - - fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" -) - -func nodeSelectorRequirementsEqual(r1, r2 v1.NodeSelectorRequirement) bool { - if r1.Key != r2.Key { - return false - } - if r1.Operator != r2.Operator { - return false - } - vals1 := sets.NewString(r1.Values...) - vals2 := sets.NewString(r2.Values...) - if vals1.Equal(vals2) { - return true - } - return false -} - -func nodeSelectorTermsEqual(t1, t2 v1.NodeSelectorTerm) bool { - exprs1 := t1.MatchExpressions - exprs2 := t2.MatchExpressions - fields1 := t1.MatchFields - fields2 := t2.MatchFields - if len(exprs1) != len(exprs2) { - return false - } - if len(fields1) != len(fields2) { - return false - } - match := func(reqs1, reqs2 []v1.NodeSelectorRequirement) bool { - for _, req1 := range reqs1 { - reqMatched := false - for _, req2 := range reqs2 { - if nodeSelectorRequirementsEqual(req1, req2) { - reqMatched = true - break - } - } - if !reqMatched { - return false - } - } - return true - } - return match(exprs1, exprs2) && match(exprs2, exprs1) && match(fields1, fields2) && match(fields2, fields1) -} - -// volumeNodeAffinitiesEqual performs a highly semantic comparison of two VolumeNodeAffinity data structures -// It ignores ordering of instances of NodeSelectorRequirements in a VolumeNodeAffinity's NodeSelectorTerms as well as -// orderding of strings in Values of NodeSelectorRequirements when matching two VolumeNodeAffinity structures. -// Note that in most equality functions, Go considers two slices to be not equal if the order of elements in a slice do not -// match - so reflect.DeepEqual as well as Semantic.DeepEqual do not work for comparing VolumeNodeAffinity semantically. -// e.g. these two NodeSelectorTerms are considered semantically equal by volumeNodeAffinitiesEqual -// &VolumeNodeAffinity{Required:&NodeSelector{NodeSelectorTerms:[{[{a In [1]} {b In [2 3]}] []}],},} -// &VolumeNodeAffinity{Required:&NodeSelector{NodeSelectorTerms:[{[{b In [3 2]} {a In [1]}] []}],},} -// TODO: move volumeNodeAffinitiesEqual to utils so other can use it too -func volumeNodeAffinitiesEqual(n1, n2 *v1.VolumeNodeAffinity) bool { - if (n1 == nil) != (n2 == nil) { - return false - } - if n1 == nil || n2 == nil { - return true - } - ns1 := n1.Required - ns2 := n2.Required - - if (ns1 == nil) != (ns2 == nil) { - return false - } - if (ns1 == nil) && (ns2 == nil) { - return true - } - if len(ns1.NodeSelectorTerms) != len(ns1.NodeSelectorTerms) { - return false - } - match := func(terms1, terms2 []v1.NodeSelectorTerm) bool { - for _, term1 := range terms1 { - termMatched := false - for _, term2 := range terms2 { - if nodeSelectorTermsEqual(term1, term2) { - termMatched = true - break - } - } - if !termMatched { - return false - } - } - return true - } - return match(ns1.NodeSelectorTerms, ns2.NodeSelectorTerms) && match(ns2.NodeSelectorTerms, ns1.NodeSelectorTerms) -} - -func TestCreatePatch(t *testing.T) { - ignoredPV := v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "noncloud", - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - HostPath: &v1.HostPathVolumeSource{ - Path: "/", - }, - }, - }, - } - awsPV := v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "awsPV", - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ - VolumeID: "123", - }, - }, - }, - } - expectedAffinitya1b2MergedWithAWSPV := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "a", - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - { - Key: "b", - Operator: v1.NodeSelectorOpIn, - Values: []string{"2"}, - }, - }, - }, - }, - }, - } - expectedAffinityZone1MergedWithAWSPV := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: v1.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - }, - }, - }, - }, - } - expectedAffinityZonesMergedWithAWSPV := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: v1.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1", "2", "3"}, - }, - }, - }, - }, - }, - } - awsPVWithAffinity := v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "awsPV", - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ - VolumeID: "123", - }, - }, - NodeAffinity: &v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "c", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val1", "val2"}, - }, - { - Key: "d", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val3"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "e", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val4", "val5"}, - }, - }, - }, - }, - }, - }, - }, - } - expectedAffinitya1b2MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "c", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val1", "val2"}, - }, - { - Key: "d", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val3"}, - }, - { - Key: "a", - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - { - Key: "b", - Operator: v1.NodeSelectorOpIn, - Values: []string{"2"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "e", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val4", "val5"}, - }, - { - Key: "a", - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - { - Key: "b", - Operator: v1.NodeSelectorOpIn, - Values: []string{"2"}, - }, - }, - }, - }, - }, - } - expectedAffinityZone1MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "c", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val1", "val2"}, - }, - { - Key: "d", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val3"}, - }, - { - Key: v1.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "e", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val4", "val5"}, - }, - { - Key: v1.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - }, - }, - }, - }, - } - expectedAffinityZonesMergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "c", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val1", "val2"}, - }, - { - Key: "d", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val3"}, - }, - { - Key: v1.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1", "2", "3"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "e", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val5", "val4"}, - }, - { - Key: v1.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"3", "2", "1"}, - }, - }, - }, - }, - }, - } - - zones, _ := volumehelpers.ZonesToSet("1,2,3") - testCases := map[string]struct { - vol v1.PersistentVolume - labels map[string]string - expectedAffinity *v1.VolumeNodeAffinity - }{ - "non-cloud PV": { - vol: ignoredPV, - labels: nil, - expectedAffinity: nil, - }, - "no labels": { - vol: awsPV, - labels: nil, - expectedAffinity: nil, - }, - "cloudprovider returns nil, nil": { - vol: awsPV, - labels: nil, - expectedAffinity: nil, - }, - "cloudprovider labels": { - vol: awsPV, - labels: map[string]string{"a": "1", "b": "2"}, - expectedAffinity: &expectedAffinitya1b2MergedWithAWSPV, - }, - "cloudprovider labels pre-existing affinity non-conflicting": { - vol: awsPVWithAffinity, - labels: map[string]string{"a": "1", "b": "2"}, - expectedAffinity: &expectedAffinitya1b2MergedWithAWSPVWithAffinity, - }, - "cloudprovider labels pre-existing affinity conflicting": { - vol: awsPVWithAffinity, - labels: map[string]string{"a": "1", "c": "2"}, - expectedAffinity: nil, - }, - "cloudprovider singlezone": { - vol: awsPV, - labels: map[string]string{v1.LabelZoneFailureDomain: "1"}, - expectedAffinity: &expectedAffinityZone1MergedWithAWSPV, - }, - "cloudprovider singlezone pre-existing affinity non-conflicting": { - vol: awsPVWithAffinity, - labels: map[string]string{v1.LabelZoneFailureDomain: "1"}, - expectedAffinity: &expectedAffinityZone1MergedWithAWSPVWithAffinity, - }, - "cloudprovider multizone": { - vol: awsPV, - labels: map[string]string{v1.LabelZoneFailureDomain: volumehelpers.ZonesSetToLabelValue(zones)}, - expectedAffinity: &expectedAffinityZonesMergedWithAWSPV, - }, - "cloudprovider multizone pre-existing affinity non-conflicting": { - vol: awsPVWithAffinity, - labels: map[string]string{v1.LabelZoneFailureDomain: volumehelpers.ZonesSetToLabelValue(zones)}, - expectedAffinity: &expectedAffinityZonesMergedWithAWSPVWithAffinity, - }, - } - - for d, tc := range testCases { - cloud := &fakecloud.FakeCloud{} - client := fake.NewSimpleClientset() - pvlController := NewPersistentVolumeLabelController(client, cloud) - patch, err := pvlController.createPatch(&tc.vol, tc.labels) - if err != nil { - t.Errorf("%s: createPatch returned err: %v", d, err) - } - obj := &v1.PersistentVolume{} - json.Unmarshal(patch, obj) - - // TODO: check if object was marked as initialized - // if ... object was not marked as initialized ... { - // t.Errorf("%s: wasn't marked as initialized: %#v", d, obj) - // } - - if tc.labels == nil { - continue - } - for k, v := range tc.labels { - if obj.ObjectMeta.Labels[k] != v { - t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k]) - } - } - if !volumeNodeAffinitiesEqual(tc.expectedAffinity, obj.Spec.NodeAffinity) { - t.Errorf("Expected affinity %v does not match target affinity %v", tc.expectedAffinity, obj.Spec.NodeAffinity) - } - } -} - -func TestAddLabelsToVolume(t *testing.T) { - pv := v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "awsPV", - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ - VolumeID: "123", - }, - }, - }, - } - - testCases := map[string]struct { - vol v1.PersistentVolume - shouldLabelAndSetAffinity bool - }{ - "PV without initializer": { - vol: pv, - shouldLabelAndSetAffinity: false, - }, - // "PV with initializer to remove": { - // vol: pv, - // shouldLabelAndSetAffinity: true, - // }, - // "PV with other initializers only": { - // vol: pv, - // shouldLabelAndSetAffinity: false, - // }, - // "PV with other initializers first": { - // vol: pv, - // shouldLabelAndSetAffinity: false, - // }, - } - - for d, tc := range testCases { - labeledCh := make(chan bool, 1) - client := fake.NewSimpleClientset() - client.PrependReactor("patch", "persistentvolumes", func(action core.Action) (handled bool, ret runtime.Object, err error) { - patch := action.(core.PatchActionImpl).GetPatch() - obj := &v1.PersistentVolume{} - json.Unmarshal(patch, obj) - if obj.ObjectMeta.Labels["a"] != "1" { - return false, nil, nil - } - if obj.Spec.NodeAffinity == nil { - return false, nil, nil - } - if obj.Spec.NodeAffinity.Required == nil { - return false, nil, nil - } - if len(obj.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 { - return false, nil, nil - } - reqs := obj.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions - if len(reqs) != 1 { - return false, nil, nil - } - if reqs[0].Key != "a" || reqs[0].Values[0] != "1" || reqs[0].Operator != v1.NodeSelectorOpIn { - return false, nil, nil - } - labeledCh <- true - return true, nil, nil - }) - - fakeCloud := &fakecloud.FakeCloud{ - VolumeLabelMap: map[string]map[string]string{"awsPV": {"a": "1"}}, - } - pvlController := &PersistentVolumeLabelController{kubeClient: client, cloud: fakeCloud} - pvlController.addLabelsAndAffinityToVolume(&tc.vol) - - select { - case l := <-labeledCh: - if l != tc.shouldLabelAndSetAffinity { - t.Errorf("%s: label and affinity setting of pv failed. expected %t got %t", d, tc.shouldLabelAndSetAffinity, l) - } - case <-time.After(500 * time.Millisecond): - if tc.shouldLabelAndSetAffinity != false { - t.Errorf("%s: timed out waiting for label and affinity setting notification", d) - } - } - } -} diff --git a/pkg/kubeapiserver/options/plugins.go b/pkg/kubeapiserver/options/plugins.go index d62ea7f3375..66c06904adb 100644 --- a/pkg/kubeapiserver/options/plugins.go +++ b/pkg/kubeapiserver/options/plugins.go @@ -111,7 +111,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) { exists.Register(plugins) noderestriction.Register(plugins) nodetaint.Register(plugins) - label.Register(plugins) // DEPRECATED in favor of NewPersistentVolumeLabelController in CCM + label.Register(plugins) // DEPRECATED, future PVs should not rely on labels for zone topology podnodeselector.Register(plugins) podpreset.Register(plugins) podtolerationrestriction.Register(plugins)