From e711cbf912e0867995f587ce5ef283ca1f58beb7 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Thu, 16 Jun 2016 13:36:55 -0400 Subject: [PATCH] GCE/AWS: Spread PetSet volume creation across zones Long term we plan on integrating this into the scheduler, but in the short term we use the volume name to place it onto a zone. We hash the volume name so we don't bias to the first few zones. If the volume name "looks like" a PetSet volume name (ending with -) then we use the number as an offset. In that case we hash the base name. Fixes #27256 --- pkg/cloudprovider/providers/aws/aws.go | 53 +++++++++++++++++-- pkg/cloudprovider/providers/gce/gce.go | 32 +++++++++++ pkg/controller/persistentvolume/controller.go | 1 + pkg/volume/aws_ebs/aws_util.go | 1 + pkg/volume/gce_pd/gce_util.go | 6 ++- pkg/volume/plugins.go | 2 + pkg/volume/util.go | 49 +++++++++++++++++ 7 files changed, 138 insertions(+), 6 deletions(-) diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index f836abf9016..ee53528e9dd 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -49,6 +49,7 @@ import ( aws_credentials "k8s.io/kubernetes/pkg/credentialprovider/aws" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/volume" ) const ProviderName = "aws" @@ -198,6 +199,7 @@ type EC2Metadata interface { type VolumeOptions struct { CapacityGB int Tags map[string]string + PVCName string } // Volumes is an interface for managing cloud-provisioned volumes @@ -914,6 +916,46 @@ func (aws *AWSCloud) List(filter string) ([]string, error) { return aws.getInstancesByRegex(filter) } +// getAllZones retrieves a list of all the zones in which nodes are running +// It currently involves querying all instances +func (c *AWSCloud) getAllZones() (sets.String, error) { + // TODO: Caching, although we currently only use this in volume creation + // TODO: We could also query for subnets, I think + + filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} + filters = c.addFilters(filters) + request := &ec2.DescribeInstancesInput{ + Filters: filters, + } + + instances, err := c.ec2.DescribeInstances(request) + if err != nil { + return nil, err + } + if len(instances) == 0 { + return nil, fmt.Errorf("no instances returned") + } + + zones := sets.NewString() + + for _, instance := range instances { + // Only return fully-ready instances when listing instances + // (vs a query by name, where we will return it if we find it) + if orEmpty(instance.State.Name) == "pending" { + glog.V(2).Infof("Skipping EC2 instance (pending): %s", *instance.InstanceId) + continue + } + + if instance.Placement != nil { + zone := aws.StringValue(instance.Placement.AvailabilityZone) + zones.Insert(zone) + } + } + + glog.V(2).Infof("Found instances in zones %s", zones) + return zones, nil +} + // GetZone implements Zones.GetZone func (c *AWSCloud) GetZone() (cloudprovider.Zone, error) { return cloudprovider.Zone{ @@ -1387,11 +1429,14 @@ func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, e return hostDevicePath, err } -// Implements Volumes.CreateVolume +// CreateDisk implements Volumes.CreateDisk func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) { - // Default to creating in the current zone - // TODO: Spread across zones? - createAZ := s.selfAWSInstance.availabilityZone + allZones, err := s.getAllZones() + if err != nil { + return "", fmt.Errorf("error querying for all zones: %v", err) + } + + createAZ := volume.ChooseZoneForVolume(allZones, volumeOptions.PVCName) // TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?) request := &ec2.CreateVolumeInput{} diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index a45912fecb3..ad4afe55d16 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -2066,6 +2066,38 @@ func (gce *GCECloud) List(filter string) ([]string, error) { return instances, nil } +// GetAllZones returns all the zones in which nodes are running +func (gce *GCECloud) GetAllZones() (sets.String, error) { + if len(gce.managedZones) == 1 { + return sets.NewString(gce.managedZones...), nil + } + + // TODO: Caching, but this is currently only called when we are creating a volume, + // which is a relatively infrequent operation, and this is only # zones API calls + zones := sets.NewString() + + // TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically) + for _, zone := range gce.managedZones { + // We only retrieve one page in each zone - we only care about existence + listCall := gce.service.Instances.List(gce.projectID, zone) + + // No filter: We assume that a zone is either used or unused + + // Just a minimal set of fields - we only care about existence + listCall = listCall.Fields("items(name)") + + res, err := listCall.Do() + if err != nil { + return nil, err + } + if len(res.Items) != 0 { + zones.Insert(zone) + } + } + + return zones, nil +} + func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) { for _, item := range metadata.Items { if item.Key == key { diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index d17f35cb235..3bf1c1ac1ce 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -1125,6 +1125,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa CloudTags: &tags, ClusterName: ctrl.clusterName, PVName: pvName, + PVCName: claim.Name, } // Provision the volume diff --git a/pkg/volume/aws_ebs/aws_util.go b/pkg/volume/aws_ebs/aws_util.go index fba3f007038..c3097f31c0d 100644 --- a/pkg/volume/aws_ebs/aws_util.go +++ b/pkg/volume/aws_ebs/aws_util.go @@ -82,6 +82,7 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (strin volumeOptions := &aws.VolumeOptions{ CapacityGB: requestGB, Tags: tags, + PVCName: c.options.PVCName, } name, err := cloud.CreateDisk(volumeOptions) diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index bb8b31da4bd..e77817dbea1 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -82,13 +82,15 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin // The disk will be created in the zone in which this code is currently running // TODO: We should support auto-provisioning volumes in multiple/specified zones - zone, err := cloud.GetZone() + zones, err := cloud.GetAllZones() if err != nil { glog.V(2).Infof("error getting zone information from GCE: %v", err) return "", 0, nil, err } - err = cloud.CreateDisk(name, zone.FailureDomain, int64(requestGB), *c.options.CloudTags) + zone := volume.ChooseZoneForVolume(zones, c.options.PVCName) + + err = cloud.CreateDisk(name, zone, int64(requestGB), *c.options.CloudTags) if err != nil { glog.V(2).Infof("Error creating GCE PD volume: %v", err) return "", 0, nil, err diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 717dd138084..70760133f45 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -49,6 +49,8 @@ type VolumeOptions struct { // PV.Name of the appropriate PersistentVolume. Used to generate cloud // volume name. PVName string + // PVC.Name of the PersistentVolumeClaim, if we are auto-provisioning. + PVCName string // Unique name of Kubernetes cluster. ClusterName string // Tags to attach to the real volume in the cloud provider - e.g. AWS EBS diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 696f9254b08..40d6c872f87 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -28,8 +28,13 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "hash/fnv" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/util/sets" + "math/rand" + "strconv" + "strings" ) // RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume @@ -187,3 +192,47 @@ func GenerateVolumeName(clusterName, pvName string, maxLength int) string { } return prefix + "-" + pvName } + +// ChooseZone implements our heuristics for choosing a zone for volume creation based on the volume name +func ChooseZoneForVolume(zones sets.String, pvcName string) string { + // We create the volume in a zone determined by the name + // Eventually the scheduler will coordinate placement into an available zone + var hash uint32 + var index uint32 + + if pvcName == "" { + // We should always be called with a name; this shouldn't happen + glog.Warningf("No Name defined during volume create; choosing random zone") + + hash = rand.Uint32() + } else { + hashString := pvcName + + // Heuristic to make sure that volumes in a PetSet are spread across zones + // PetSet PVCs are (currently) named ClaimName-PetSetName-Id, + // where Id is an integer index + lastDash := strings.LastIndexByte(pvcName, '-') + if lastDash != -1 { + petIDString := pvcName[lastDash+1:] + petID, err := strconv.ParseUint(petIDString, 10, 32) + if err == nil { + // Offset by the pet id, so we round-robin across zones + index = uint32(petID) + // We still hash the volume name, but only the base + hashString = pvcName[:lastDash] + glog.V(2).Infof("Detected PetSet-style volume name %q; index=%d", pvcName, index) + } + } + + // We hash the (base) volume name, so we don't bias towards the first N zones + h := fnv.New32() + h.Write([]byte(hashString)) + hash = h.Sum32() + } + + zoneSlice := zones.List() + zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))] + + glog.V(2).Infof("Creating volume for PVC %q; chose zone=%q from zones=%q", pvcName, zone, zoneSlice) + return zone +}