diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 87bb8081db2..9a60e5490e6 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -202,3 +202,8 @@ type Zones interface { // outside the kubelets. GetZoneByNodeName(nodeName types.NodeName) (Zone, error) } + +// PVLabeler is an abstract, pluggable interface for fetching labels for volumes +type PVLabeler interface { + GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) +} diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index c41c15f0d01..0e000be68a6 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -201,6 +201,9 @@ const DefaultMaxEBSVolumes = 39 // Used to call RecognizeWellKnownRegions just once var once sync.Once +// AWS implements PVLabeler. +var _ cloudprovider.PVLabeler = (*Cloud)(nil) + // Services is an abstraction over AWS, to allow mocking/other implementations type Services interface { Compute(region string) (EC2, error) @@ -1922,6 +1925,21 @@ func (c *Cloud) DeleteDisk(volumeName KubernetesVolumeID) (bool, error) { return awsDisk.deleteVolume() } +func (c *Cloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) { + // Ignore any volumes that are being provisioned + if pv.Spec.AWSElasticBlockStore.VolumeID == volume.ProvisionedVolumeName { + return nil, nil + } + + spec := KubernetesVolumeID(pv.Spec.AWSElasticBlockStore.VolumeID) + labels, err := c.GetVolumeLabels(spec) + if err != nil { + return nil, err + } + + return labels, nil +} + // GetVolumeLabels implements Volumes.GetVolumeLabels func (c *Cloud) GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error) { awsDisk, err := newAWSDisk(c, volumeName) diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index 98fdc8d6641..f35f244ddba 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -69,6 +69,7 @@ type FakeCloud struct { Provider string addCallLock sync.Mutex cloudprovider.Zone + VolumeLabelMap map[string]map[string]string } type FakeRoute struct { @@ -322,3 +323,10 @@ func (f *FakeCloud) DeleteRoute(clusterName string, route *cloudprovider.Route) delete(f.RouteMap, name) return nil } + +func (c *FakeCloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) { + if val, ok := c.VolumeLabelMap[pv.Name]; ok { + return val, nil + } + return nil, fmt.Errorf("label not found for volume") +} diff --git a/pkg/cloudprovider/providers/gce/gce_disks.go b/pkg/cloudprovider/providers/gce/gce_disks.go index 997b65d624c..ac7f6ee394d 100644 --- a/pkg/cloudprovider/providers/gce/gce_disks.go +++ b/pkg/cloudprovider/providers/gce/gce_disks.go @@ -22,6 +22,8 @@ import ( "net/http" "strings" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/cloudprovider" @@ -89,6 +91,9 @@ type Disks interface { // GCECloud implements Disks. var _ Disks = (*GCECloud)(nil) +// GCECloud implements PVLabeler. +var _ cloudprovider.PVLabeler = (*GCECloud)(nil) + type GCEDisk struct { ZoneInfo zoneType Region string @@ -120,6 +125,23 @@ func newDiskMetricContextRegional(request, region string) *metricContext { return newGenericMetricContext("disk", request, region, unusedMetricLabel, computeV1Version) } +func (gce *GCECloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) { + // Ignore any volumes that are being provisioned + if pv.Spec.GCEPersistentDisk.PDName == volume.ProvisionedVolumeName { + return nil, nil + } + + // If the zone is already labeled, honor the hint + zone := pv.Labels[kubeletapis.LabelZoneFailureDomain] + + labels, err := gce.GetAutoLabelsForPD(pv.Spec.GCEPersistentDisk.PDName, zone) + if err != nil { + return nil, err + } + + return labels, nil +} + func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOnly bool) error { instanceName := mapNodeNameToInstanceName(nodeName) instance, err := gce.getInstanceByName(instanceName) diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD index c5b1f5b010b..541f863885c 100644 --- a/pkg/controller/cloud/BUILD +++ b/pkg/controller/cloud/BUILD @@ -16,12 +16,9 @@ go_library( deps = [ "//pkg/api/v1/node:go_default_library", "//pkg/cloudprovider:go_default_library", - "//pkg/cloudprovider/providers/aws:go_default_library", - "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/controller:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/util/node:go_default_library", - "//pkg/volume:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -54,7 +51,6 @@ go_test( library = ":go_default_library", deps = [ "//pkg/cloudprovider:go_default_library", - "//pkg/cloudprovider/providers/aws:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/testutil:go_default_library", diff --git a/pkg/controller/cloud/pvlcontroller.go b/pkg/controller/cloud/pvlcontroller.go index d2c18e59029..1a75a9c2867 100644 --- a/pkg/controller/cloud/pvlcontroller.go +++ b/pkg/controller/cloud/pvlcontroller.go @@ -19,7 +19,6 @@ package cloud import ( "encoding/json" "fmt" - "sync" "time" "github.com/golang/glog" @@ -41,22 +40,13 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" - "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/controller" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - vol "k8s.io/kubernetes/pkg/volume" ) const initializerName = "pvlabel.kubernetes.io" // PersistentVolumeLabelController handles adding labels to persistent volumes when they are created type PersistentVolumeLabelController struct { - // Control access to cloud volumes - mutex sync.Mutex - ebsVolumes aws.Volumes - gceCloudProvider *gce.GCECloud - cloud cloudprovider.Interface kubeClient kubernetes.Interface pvlController cache.Controller @@ -192,105 +182,22 @@ func (pvlc *PersistentVolumeLabelController) addLabels(key string) error { func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.PersistentVolume) error { var volumeLabels map[string]string - // Only add labels if in the list of initializers if needsInitialization(vol.Initializers, initializerName) { - if vol.Spec.AWSElasticBlockStore != nil { - labels, err := pvlc.findAWSEBSLabels(vol) + if labeler, ok := (pvlc.cloud).(cloudprovider.PVLabeler); ok { + labels, err := labeler.GetLabelsForVolume(vol) if err != nil { - return fmt.Errorf("error querying AWS EBS volume %s: %v", vol.Spec.AWSElasticBlockStore.VolumeID, err) - } - volumeLabels = labels - } - if vol.Spec.GCEPersistentDisk != nil { - labels, err := pvlc.findGCEPDLabels(vol) - if err != nil { - return fmt.Errorf("error querying GCE PD volume %s: %v", vol.Spec.GCEPersistentDisk.PDName, err) + return fmt.Errorf("error querying volume %v: %v", vol.Spec, err) } volumeLabels = labels + } else { + glog.V(4).Info("cloud provider does not support PVLabeler") } return pvlc.updateVolume(vol, volumeLabels) } - return nil } -func (pvlc *PersistentVolumeLabelController) findAWSEBSLabels(volume *v1.PersistentVolume) (map[string]string, error) { - // Ignore any volumes that are being provisioned - if volume.Spec.AWSElasticBlockStore.VolumeID == vol.ProvisionedVolumeName { - return nil, nil - } - ebsVolumes, err := pvlc.getEBSVolumes() - if err != nil { - return nil, err - } - - // TODO: GetVolumeLabels is actually a method on the Volumes interface - // If that gets standardized we can refactor to reduce code duplication - spec := aws.KubernetesVolumeID(volume.Spec.AWSElasticBlockStore.VolumeID) - labels, err := ebsVolumes.GetVolumeLabels(spec) - if err != nil { - return nil, err - } - - return labels, nil -} - -// getEBSVolumes returns the AWS Volumes interface for ebs -func (pvlc *PersistentVolumeLabelController) getEBSVolumes() (aws.Volumes, error) { - pvlc.mutex.Lock() - defer pvlc.mutex.Unlock() - - if pvlc.ebsVolumes == nil { - awsCloudProvider := pvlc.cloud.(*aws.Cloud) - awsCloudProvider, ok := pvlc.cloud.(*aws.Cloud) - if !ok { - // GetCloudProvider has gone very wrong - return nil, fmt.Errorf("error retrieving AWS cloud provider") - } - pvlc.ebsVolumes = awsCloudProvider - } - return pvlc.ebsVolumes, nil -} - -func (pvlc *PersistentVolumeLabelController) findGCEPDLabels(volume *v1.PersistentVolume) (map[string]string, error) { - // Ignore any volumes that are being provisioned - if volume.Spec.GCEPersistentDisk.PDName == vol.ProvisionedVolumeName { - return nil, nil - } - - provider, err := pvlc.getGCECloudProvider() - if err != nil { - return nil, err - } - - // If the zone is already labeled, honor the hint - zone := volume.Labels[kubeletapis.LabelZoneFailureDomain] - - labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName, zone) - if err != nil { - return nil, err - } - - return labels, nil -} - -// getGCECloudProvider returns the GCE cloud provider, for use for querying volume labels -func (pvlc *PersistentVolumeLabelController) getGCECloudProvider() (*gce.GCECloud, error) { - pvlc.mutex.Lock() - defer pvlc.mutex.Unlock() - - if pvlc.gceCloudProvider == nil { - gceCloudProvider, ok := pvlc.cloud.(*gce.GCECloud) - if !ok { - // GetCloudProvider has gone very wrong - return nil, fmt.Errorf("error retrieving GCE cloud provider") - } - pvlc.gceCloudProvider = gceCloudProvider - } - return pvlc.gceCloudProvider, nil -} - func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) { volName := vol.Name newVolume := vol.DeepCopyObject().(*v1.PersistentVolume) diff --git a/pkg/controller/cloud/pvlcontroller_test.go b/pkg/controller/cloud/pvlcontroller_test.go index 89567349d3b..48b079122c5 100644 --- a/pkg/controller/cloud/pvlcontroller_test.go +++ b/pkg/controller/cloud/pvlcontroller_test.go @@ -18,7 +18,6 @@ package cloud import ( "encoding/json" - "fmt" "testing" "time" @@ -26,54 +25,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" - "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" ) -type mockVolumes struct { - volumeLabels map[string]string - volumeLabelsError error -} - -var _ aws.Volumes = &mockVolumes{} - -func (v *mockVolumes) AttachDisk(diskName aws.KubernetesVolumeID, nodeName types.NodeName, readOnly bool) (string, error) { - return "", fmt.Errorf("not implemented") -} - -func (v *mockVolumes) DetachDisk(diskName aws.KubernetesVolumeID, nodeName types.NodeName) (string, error) { - return "", fmt.Errorf("not implemented") -} - -func (v *mockVolumes) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName aws.KubernetesVolumeID, err error) { - return "", fmt.Errorf("not implemented") -} - -func (v *mockVolumes) DeleteDisk(volumeName aws.KubernetesVolumeID) (bool, error) { - return false, fmt.Errorf("not implemented") -} - -func (v *mockVolumes) GetVolumeLabels(volumeName aws.KubernetesVolumeID) (map[string]string, error) { - return v.volumeLabels, v.volumeLabelsError -} - -func (c *mockVolumes) GetDiskPath(volumeName aws.KubernetesVolumeID) (string, error) { - return "", fmt.Errorf("not implemented") -} - -func (c *mockVolumes) DiskIsAttached(volumeName aws.KubernetesVolumeID, nodeName types.NodeName) (bool, error) { - return false, fmt.Errorf("not implemented") -} - -func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { - return nil, fmt.Errorf("not implemented") -} - func TestCreatePatch(t *testing.T) { ignoredPV := v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ @@ -208,7 +166,11 @@ func TestAddLabelsToVolume(t *testing.T) { labeledCh <- true return true, nil, nil }) - pvlController := &PersistentVolumeLabelController{kubeClient: client, ebsVolumes: &mockVolumes{volumeLabels: map[string]string{"a": "1"}}} + + fakeCloud := &fakecloud.FakeCloud{ + VolumeLabelMap: map[string]map[string]string{"awsPV": {"a": "1"}}, + } + pvlController := &PersistentVolumeLabelController{kubeClient: client, cloud: fakeCloud} tc.vol.ObjectMeta.Initializers = tc.initializers pvlController.addLabelsToVolume(&tc.vol)