Merge pull request #52169 from dims/remove-links-to-specific-cloud-providers

Automatic merge from submit-queue (batch tested with PRs 52007, 52196, 52169, 52263, 52291)

Remove links to GCE/AWS cloud providers from PersistentVolumeCo…

…ntroller




**What this PR does / why we need it**:

We should be able to build a cloud-controller-manager without having to
pull in code specific to GCE and AWS clouds. Note that this is a tactical
fix for now, we should have allow PVLabeler to be passed into the
PersistentVolumeController, maybe come up with better interfaces etc. Since
it is too late to do all that for 1.8, we just move cloud specific code
to where they belong and we check for PVLabeler method and use it where
needed.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

Fixes #51629

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-09-12 08:46:12 -07:00 committed by GitHub
commit 1f072babe8
7 changed files with 63 additions and 145 deletions

View File

@ -202,3 +202,8 @@ type Zones interface {
// outside the kubelets. // outside the kubelets.
GetZoneByNodeName(nodeName types.NodeName) (Zone, error) GetZoneByNodeName(nodeName types.NodeName) (Zone, error)
} }
// PVLabeler is an abstract, pluggable interface for fetching labels for volumes
type PVLabeler interface {
GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error)
}

View File

@ -201,6 +201,9 @@ const DefaultMaxEBSVolumes = 39
// Used to call RecognizeWellKnownRegions just once // Used to call RecognizeWellKnownRegions just once
var once sync.Once var once sync.Once
// AWS implements PVLabeler.
var _ cloudprovider.PVLabeler = (*Cloud)(nil)
// Services is an abstraction over AWS, to allow mocking/other implementations // Services is an abstraction over AWS, to allow mocking/other implementations
type Services interface { type Services interface {
Compute(region string) (EC2, error) Compute(region string) (EC2, error)
@ -1922,6 +1925,21 @@ func (c *Cloud) DeleteDisk(volumeName KubernetesVolumeID) (bool, error) {
return awsDisk.deleteVolume() return awsDisk.deleteVolume()
} }
func (c *Cloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if pv.Spec.AWSElasticBlockStore.VolumeID == volume.ProvisionedVolumeName {
return nil, nil
}
spec := KubernetesVolumeID(pv.Spec.AWSElasticBlockStore.VolumeID)
labels, err := c.GetVolumeLabels(spec)
if err != nil {
return nil, err
}
return labels, nil
}
// GetVolumeLabels implements Volumes.GetVolumeLabels // GetVolumeLabels implements Volumes.GetVolumeLabels
func (c *Cloud) GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error) { func (c *Cloud) GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error) {
awsDisk, err := newAWSDisk(c, volumeName) awsDisk, err := newAWSDisk(c, volumeName)

View File

@ -69,6 +69,7 @@ type FakeCloud struct {
Provider string Provider string
addCallLock sync.Mutex addCallLock sync.Mutex
cloudprovider.Zone cloudprovider.Zone
VolumeLabelMap map[string]map[string]string
} }
type FakeRoute struct { type FakeRoute struct {
@ -322,3 +323,10 @@ func (f *FakeCloud) DeleteRoute(clusterName string, route *cloudprovider.Route)
delete(f.RouteMap, name) delete(f.RouteMap, name)
return nil return nil
} }
func (c *FakeCloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) {
if val, ok := c.VolumeLabelMap[pv.Name]; ok {
return val, nil
}
return nil, fmt.Errorf("label not found for volume")
}

View File

@ -22,6 +22,8 @@ import (
"net/http" "net/http"
"strings" "strings"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
@ -89,6 +91,9 @@ type Disks interface {
// GCECloud implements Disks. // GCECloud implements Disks.
var _ Disks = (*GCECloud)(nil) var _ Disks = (*GCECloud)(nil)
// GCECloud implements PVLabeler.
var _ cloudprovider.PVLabeler = (*GCECloud)(nil)
type GCEDisk struct { type GCEDisk struct {
ZoneInfo zoneType ZoneInfo zoneType
Region string Region string
@ -120,6 +125,23 @@ func newDiskMetricContextRegional(request, region string) *metricContext {
return newGenericMetricContext("disk", request, region, unusedMetricLabel, computeV1Version) return newGenericMetricContext("disk", request, region, unusedMetricLabel, computeV1Version)
} }
func (gce *GCECloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if pv.Spec.GCEPersistentDisk.PDName == volume.ProvisionedVolumeName {
return nil, nil
}
// If the zone is already labeled, honor the hint
zone := pv.Labels[kubeletapis.LabelZoneFailureDomain]
labels, err := gce.GetAutoLabelsForPD(pv.Spec.GCEPersistentDisk.PDName, zone)
if err != nil {
return nil, err
}
return labels, nil
}
func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOnly bool) error { func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOnly bool) error {
instanceName := mapNodeNameToInstanceName(nodeName) instanceName := mapNodeNameToInstanceName(nodeName)
instance, err := gce.getInstanceByName(instanceName) instance, err := gce.getInstanceByName(instanceName)

View File

@ -16,12 +16,9 @@ go_library(
deps = [ deps = [
"//pkg/api/v1/node:go_default_library", "//pkg/api/v1/node:go_default_library",
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis:go_default_library",
"//pkg/util/node:go_default_library", "//pkg/util/node:go_default_library",
"//pkg/volume:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
@ -54,7 +51,6 @@ go_test(
library = ":go_default_library", library = ":go_default_library",
deps = [ deps = [
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/testutil:go_default_library", "//pkg/controller/testutil:go_default_library",

View File

@ -19,7 +19,6 @@ package cloud
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -41,22 +40,13 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
vol "k8s.io/kubernetes/pkg/volume"
) )
const initializerName = "pvlabel.kubernetes.io" const initializerName = "pvlabel.kubernetes.io"
// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created // PersistentVolumeLabelController handles adding labels to persistent volumes when they are created
type PersistentVolumeLabelController struct { type PersistentVolumeLabelController struct {
// Control access to cloud volumes
mutex sync.Mutex
ebsVolumes aws.Volumes
gceCloudProvider *gce.GCECloud
cloud cloudprovider.Interface cloud cloudprovider.Interface
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
pvlController cache.Controller pvlController cache.Controller
@ -192,105 +182,22 @@ func (pvlc *PersistentVolumeLabelController) addLabels(key string) error {
func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.PersistentVolume) error { func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.PersistentVolume) error {
var volumeLabels map[string]string var volumeLabels map[string]string
// Only add labels if in the list of initializers // Only add labels if in the list of initializers
if needsInitialization(vol.Initializers, initializerName) { if needsInitialization(vol.Initializers, initializerName) {
if vol.Spec.AWSElasticBlockStore != nil { if labeler, ok := (pvlc.cloud).(cloudprovider.PVLabeler); ok {
labels, err := pvlc.findAWSEBSLabels(vol) labels, err := labeler.GetLabelsForVolume(vol)
if err != nil { if err != nil {
return fmt.Errorf("error querying AWS EBS volume %s: %v", vol.Spec.AWSElasticBlockStore.VolumeID, err) return fmt.Errorf("error querying volume %v: %v", vol.Spec, err)
}
volumeLabels = labels
}
if vol.Spec.GCEPersistentDisk != nil {
labels, err := pvlc.findGCEPDLabels(vol)
if err != nil {
return fmt.Errorf("error querying GCE PD volume %s: %v", vol.Spec.GCEPersistentDisk.PDName, err)
} }
volumeLabels = labels volumeLabels = labels
} else {
glog.V(4).Info("cloud provider does not support PVLabeler")
} }
return pvlc.updateVolume(vol, volumeLabels) return pvlc.updateVolume(vol, volumeLabels)
} }
return nil return nil
} }
func (pvlc *PersistentVolumeLabelController) findAWSEBSLabels(volume *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if volume.Spec.AWSElasticBlockStore.VolumeID == vol.ProvisionedVolumeName {
return nil, nil
}
ebsVolumes, err := pvlc.getEBSVolumes()
if err != nil {
return nil, err
}
// TODO: GetVolumeLabels is actually a method on the Volumes interface
// If that gets standardized we can refactor to reduce code duplication
spec := aws.KubernetesVolumeID(volume.Spec.AWSElasticBlockStore.VolumeID)
labels, err := ebsVolumes.GetVolumeLabels(spec)
if err != nil {
return nil, err
}
return labels, nil
}
// getEBSVolumes returns the AWS Volumes interface for ebs
func (pvlc *PersistentVolumeLabelController) getEBSVolumes() (aws.Volumes, error) {
pvlc.mutex.Lock()
defer pvlc.mutex.Unlock()
if pvlc.ebsVolumes == nil {
awsCloudProvider := pvlc.cloud.(*aws.Cloud)
awsCloudProvider, ok := pvlc.cloud.(*aws.Cloud)
if !ok {
// GetCloudProvider has gone very wrong
return nil, fmt.Errorf("error retrieving AWS cloud provider")
}
pvlc.ebsVolumes = awsCloudProvider
}
return pvlc.ebsVolumes, nil
}
func (pvlc *PersistentVolumeLabelController) findGCEPDLabels(volume *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if volume.Spec.GCEPersistentDisk.PDName == vol.ProvisionedVolumeName {
return nil, nil
}
provider, err := pvlc.getGCECloudProvider()
if err != nil {
return nil, err
}
// If the zone is already labeled, honor the hint
zone := volume.Labels[kubeletapis.LabelZoneFailureDomain]
labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName, zone)
if err != nil {
return nil, err
}
return labels, nil
}
// getGCECloudProvider returns the GCE cloud provider, for use for querying volume labels
func (pvlc *PersistentVolumeLabelController) getGCECloudProvider() (*gce.GCECloud, error) {
pvlc.mutex.Lock()
defer pvlc.mutex.Unlock()
if pvlc.gceCloudProvider == nil {
gceCloudProvider, ok := pvlc.cloud.(*gce.GCECloud)
if !ok {
// GetCloudProvider has gone very wrong
return nil, fmt.Errorf("error retrieving GCE cloud provider")
}
pvlc.gceCloudProvider = gceCloudProvider
}
return pvlc.gceCloudProvider, nil
}
func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) { func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) {
volName := vol.Name volName := vol.Name
newVolume := vol.DeepCopyObject().(*v1.PersistentVolume) newVolume := vol.DeepCopyObject().(*v1.PersistentVolume)

View File

@ -18,7 +18,6 @@ package cloud
import ( import (
"encoding/json" "encoding/json"
"fmt"
"testing" "testing"
"time" "time"
@ -26,54 +25,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
) )
type mockVolumes struct {
volumeLabels map[string]string
volumeLabelsError error
}
var _ aws.Volumes = &mockVolumes{}
func (v *mockVolumes) AttachDisk(diskName aws.KubernetesVolumeID, nodeName types.NodeName, readOnly bool) (string, error) {
return "", fmt.Errorf("not implemented")
}
func (v *mockVolumes) DetachDisk(diskName aws.KubernetesVolumeID, nodeName types.NodeName) (string, error) {
return "", fmt.Errorf("not implemented")
}
func (v *mockVolumes) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName aws.KubernetesVolumeID, err error) {
return "", fmt.Errorf("not implemented")
}
func (v *mockVolumes) DeleteDisk(volumeName aws.KubernetesVolumeID) (bool, error) {
return false, fmt.Errorf("not implemented")
}
func (v *mockVolumes) GetVolumeLabels(volumeName aws.KubernetesVolumeID) (map[string]string, error) {
return v.volumeLabels, v.volumeLabelsError
}
func (c *mockVolumes) GetDiskPath(volumeName aws.KubernetesVolumeID) (string, error) {
return "", fmt.Errorf("not implemented")
}
func (c *mockVolumes) DiskIsAttached(volumeName aws.KubernetesVolumeID, nodeName types.NodeName) (bool, error) {
return false, fmt.Errorf("not implemented")
}
func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) {
return nil, fmt.Errorf("not implemented")
}
func TestCreatePatch(t *testing.T) { func TestCreatePatch(t *testing.T) {
ignoredPV := v1.PersistentVolume{ ignoredPV := v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -208,7 +166,11 @@ func TestAddLabelsToVolume(t *testing.T) {
labeledCh <- true labeledCh <- true
return true, nil, nil return true, nil, nil
}) })
pvlController := &PersistentVolumeLabelController{kubeClient: client, ebsVolumes: &mockVolumes{volumeLabels: map[string]string{"a": "1"}}}
fakeCloud := &fakecloud.FakeCloud{
VolumeLabelMap: map[string]map[string]string{"awsPV": {"a": "1"}},
}
pvlController := &PersistentVolumeLabelController{kubeClient: client, cloud: fakeCloud}
tc.vol.ObjectMeta.Initializers = tc.initializers tc.vol.ObjectMeta.Initializers = tc.initializers
pvlController.addLabelsToVolume(&tc.vol) pvlController.addLabelsToVolume(&tc.vol)