diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 3d32f5cdd99..df406e06450 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -48,6 +48,7 @@ import ( aws_credentials "k8s.io/kubernetes/pkg/credentialprovider/aws" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/volume" ) const ProviderName = "aws" @@ -197,6 +198,7 @@ type EC2Metadata interface { type VolumeOptions struct { CapacityGB int Tags map[string]string + PVCName string } // Volumes is an interface for managing cloud-provisioned volumes @@ -913,6 +915,49 @@ func (aws *AWSCloud) List(filter string) ([]string, error) { return aws.getInstancesByRegex(filter) } +// getAllZones retrieves a list of all the zones in which nodes are running +// It currently involves querying all instances +func (c *AWSCloud) getAllZones() (sets.String, error) { + // We don't currently cache this; it is currently used only in volume + // creation which is expected to be a comparatively rare occurence. + + // TODO: Caching / expose api.Nodes to the cloud provider? + // TODO: We could also query for subnets, I think + + filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} + filters = c.addFilters(filters) + request := &ec2.DescribeInstancesInput{ + Filters: filters, + } + + instances, err := c.ec2.DescribeInstances(request) + if err != nil { + return nil, err + } + if len(instances) == 0 { + return nil, fmt.Errorf("no instances returned") + } + + zones := sets.NewString() + + for _, instance := range instances { + // Only return fully-ready instances when listing instances + // (vs a query by name, where we will return it if we find it) + if orEmpty(instance.State.Name) == "pending" { + glog.V(2).Infof("Skipping EC2 instance (pending): %s", *instance.InstanceId) + continue + } + + if instance.Placement != nil { + zone := aws.StringValue(instance.Placement.AvailabilityZone) + zones.Insert(zone) + } + } + + glog.V(2).Infof("Found instances in zones %s", zones) + return zones, nil +} + // GetZone implements Zones.GetZone func (c *AWSCloud) GetZone() (cloudprovider.Zone, error) { return cloudprovider.Zone{ @@ -1383,11 +1428,14 @@ func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, e return hostDevicePath, err } -// Implements Volumes.CreateVolume +// CreateDisk implements Volumes.CreateDisk func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) { - // Default to creating in the current zone - // TODO: Spread across zones? - createAZ := s.selfAWSInstance.availabilityZone + allZones, err := s.getAllZones() + if err != nil { + return "", fmt.Errorf("error querying for all zones: %v", err) + } + + createAZ := volume.ChooseZoneForVolume(allZones, volumeOptions.PVCName) // TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?) request := &ec2.CreateVolumeInput{} diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 92d09ce13be..7352bd37bff 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -123,7 +123,9 @@ type Disks interface { // GetAutoLabelsForPD returns labels to apply to PersistentVolume // representing this PD, namely failure domain and zone. - GetAutoLabelsForPD(name string) (map[string]string, error) + // zone can be provided to specify the zone for the PD, + // if empty all managed zones will be searched. + GetAutoLabelsForPD(name string, zone string) (map[string]string, error) } func init() { @@ -2069,6 +2071,48 @@ func (gce *GCECloud) List(filter string) ([]string, error) { return instances, nil } +// GetAllZones returns all the zones in which nodes are running +func (gce *GCECloud) GetAllZones() (sets.String, error) { + // Fast-path for non-multizone + if len(gce.managedZones) == 1 { + return sets.NewString(gce.managedZones...), nil + } + + // TODO: Caching, but this is currently only called when we are creating a volume, + // which is a relatively infrequent operation, and this is only # zones API calls + zones := sets.NewString() + + // TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically) + for _, zone := range gce.managedZones { + // We only retrieve one page in each zone - we only care about existence + listCall := gce.service.Instances.List(gce.projectID, zone) + + // No filter: We assume that a zone is either used or unused + // We could only consider running nodes (like we do in List above), + // but probably if instances are starting we still want to consider them. + // I think we should wait until we have a reason to make the + // call one way or the other; we generally can't guarantee correct + // volume spreading if the set of zones is changing + // (and volume spreading is currently only a heuristic). + // Long term we want to replace GetAllZones (which primarily supports volume + // spreading) with a scheduler policy that is able to see the global state of + // volumes and the health of zones. + + // Just a minimal set of fields - we only care about existence + listCall = listCall.Fields("items(name)") + + res, err := listCall.Do() + if err != nil { + return nil, err + } + if len(res.Items) != 0 { + zones.Insert(zone) + } + } + + return zones, nil +} + func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) { for _, item := range metadata.Items { if item.Key == key { @@ -2221,13 +2265,33 @@ func (gce *GCECloud) DeleteDisk(diskToDelete string) error { // Builds the labels that should be automatically added to a PersistentVolume backed by a GCE PD // Specifically, this builds FailureDomain (zone) and Region labels. // The PersistentVolumeLabel admission controller calls this and adds the labels when a PV is created. -func (gce *GCECloud) GetAutoLabelsForPD(name string) (map[string]string, error) { - disk, err := gce.getDiskByNameUnknownZone(name) - if err != nil { - return nil, err +// If zone is specified, the volume will only be found in the specified zone, +// otherwise all managed zones will be searched. +func (gce *GCECloud) GetAutoLabelsForPD(name string, zone string) (map[string]string, error) { + var disk *gceDisk + var err error + if zone == "" { + // We would like as far as possible to avoid this case, + // because GCE doesn't guarantee that volumes are uniquely named per region, + // just per zone. However, creation of GCE PDs was originally done only + // by name, so we have to continue to support that. + // However, wherever possible the zone should be passed (and it is passed + // for most cases that we can control, e.g. dynamic volume provisioning) + disk, err = gce.getDiskByNameUnknownZone(name) + if err != nil { + return nil, err + } + zone = disk.Zone + } else { + // We could assume the disks exists; we have all the information we need + // However it is more consistent to ensure the disk exists, + // and in future we may gather addition information (e.g. disk type, IOPS etc) + disk, err = gce.getDiskByName(name, zone) + if err != nil { + return nil, err + } } - zone := disk.Zone region, err := GetGCERegion(zone) if err != nil { return nil, err diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index e115fcba66f..69e1ba54d96 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -1131,6 +1131,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa CloudTags: &tags, ClusterName: ctrl.clusterName, PVName: pvName, + PVCName: claim.Name, } // Provision the volume diff --git a/pkg/volume/aws_ebs/aws_util.go b/pkg/volume/aws_ebs/aws_util.go index fba3f007038..c3097f31c0d 100644 --- a/pkg/volume/aws_ebs/aws_util.go +++ b/pkg/volume/aws_ebs/aws_util.go @@ -82,6 +82,7 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (strin volumeOptions := &aws.VolumeOptions{ CapacityGB: requestGB, Tags: tags, + PVCName: c.options.PVCName, } name, err := cloud.CreateDisk(volumeOptions) diff --git a/pkg/volume/gce_pd/attacher_test.go b/pkg/volume/gce_pd/attacher_test.go index 85ca7a78bc7..c3c4b596b4f 100644 --- a/pkg/volume/gce_pd/attacher_test.go +++ b/pkg/volume/gce_pd/attacher_test.go @@ -359,6 +359,6 @@ func (testcase *testcase) DeleteDisk(diskToDelete string) error { return errors.New("Not implemented") } -func (testcase *testcase) GetAutoLabelsForPD(name string) (map[string]string, error) { +func (testcase *testcase) GetAutoLabelsForPD(name string, zone string) (map[string]string, error) { return map[string]string{}, errors.New("Not implemented") } diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index bb8b31da4bd..717b9b0df10 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -82,20 +82,22 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin // The disk will be created in the zone in which this code is currently running // TODO: We should support auto-provisioning volumes in multiple/specified zones - zone, err := cloud.GetZone() + zones, err := cloud.GetAllZones() if err != nil { glog.V(2).Infof("error getting zone information from GCE: %v", err) return "", 0, nil, err } - err = cloud.CreateDisk(name, zone.FailureDomain, int64(requestGB), *c.options.CloudTags) + zone := volume.ChooseZoneForVolume(zones, c.options.PVCName) + + err = cloud.CreateDisk(name, zone, int64(requestGB), *c.options.CloudTags) if err != nil { glog.V(2).Infof("Error creating GCE PD volume: %v", err) return "", 0, nil, err } glog.V(2).Infof("Successfully created GCE PD volume %s", name) - labels, err := cloud.GetAutoLabelsForPD(name) + labels, err := cloud.GetAutoLabelsForPD(name, zone) if err != nil { // We don't really want to leak the volume here... glog.Errorf("error getting labels for volume %q: %v", name, err) diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 717dd138084..6a58b68812d 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -49,6 +49,8 @@ type VolumeOptions struct { // PV.Name of the appropriate PersistentVolume. Used to generate cloud // volume name. PVName string + // PVC.Name of the PersistentVolumeClaim; only set during dynamic provisioning. + PVCName string // Unique name of Kubernetes cluster. ClusterName string // Tags to attach to the real volume in the cloud provider - e.g. AWS EBS diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 696f9254b08..9c2b352b931 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -28,8 +28,13 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "hash/fnv" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/util/sets" + "math/rand" + "strconv" + "strings" ) // RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume @@ -187,3 +192,59 @@ func GenerateVolumeName(clusterName, pvName string, maxLength int) string { } return prefix + "-" + pvName } + +// ChooseZone implements our heuristics for choosing a zone for volume creation based on the volume name +// Volumes are generally round-robin-ed across all active zones, using the hash of the PVC Name. +// However, if the PVCName ends with `-`, we will hash the prefix, and then add the integer to the hash. +// This means that a PetSet's volumes (`claimname-petsetname-id`) will spread across available zones, +// assuming the id values are consecutive. +func ChooseZoneForVolume(zones sets.String, pvcName string) string { + // We create the volume in a zone determined by the name + // Eventually the scheduler will coordinate placement into an available zone + var hash uint32 + var index uint32 + + if pvcName == "" { + // We should always be called with a name; this shouldn't happen + glog.Warningf("No name defined during volume create; choosing random zone") + + hash = rand.Uint32() + } else { + hashString := pvcName + + // Heuristic to make sure that volumes in a PetSet are spread across zones + // PetSet PVCs are (currently) named ClaimName-PetSetName-Id, + // where Id is an integer index + lastDash := strings.LastIndexByte(pvcName, '-') + if lastDash != -1 { + petIDString := pvcName[lastDash+1:] + petID, err := strconv.ParseUint(petIDString, 10, 32) + if err == nil { + // Offset by the pet id, so we round-robin across zones + index = uint32(petID) + // We still hash the volume name, but only the base + hashString = pvcName[:lastDash] + glog.V(2).Infof("Detected PetSet-style volume name %q; index=%d", pvcName, index) + } + } + + // We hash the (base) volume name, so we don't bias towards the first N zones + h := fnv.New32() + h.Write([]byte(hashString)) + hash = h.Sum32() + } + + // Zones.List returns zones in a consistent order (sorted) + // We do have a potential failure case where volumes will not be properly spread, + // if the set of zones changes during PetSet volume creation. However, this is + // probably relatively unlikely because we expect the set of zones to be essentially + // static for clusters. + // Hopefully we can address this problem if/when we do full scheduler integration of + // PVC placement (which could also e.g. avoid putting volumes in overloaded or + // unhealthy zones) + zoneSlice := zones.List() + zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))] + + glog.V(2).Infof("Creating volume for PVC %q; chose zone=%q from zones=%q", pvcName, zone, zoneSlice) + return zone +} diff --git a/plugin/pkg/admission/persistentvolume/label/admission.go b/plugin/pkg/admission/persistentvolume/label/admission.go index f3b4fd86b51..67648058ed7 100644 --- a/plugin/pkg/admission/persistentvolume/label/admission.go +++ b/plugin/pkg/admission/persistentvolume/label/admission.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" @@ -159,7 +160,10 @@ func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (m return nil, fmt.Errorf("unable to build GCE cloud provider for PD") } - labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName) + // If the zone is already labeled, honor the hint + zone := volume.Labels[unversioned.LabelZoneFailureDomain] + + labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName, zone) if err != nil { return nil, err }