Merge pull request #27553 from justinsb/pvc_zone_spreading_2

Automatic merge from submit-queue

AWS/GCE: Spread PetSet volume creation across zones, create GCE volumes in non-master zones

Long term we plan on integrating this into the scheduler, but in the
short term we use the volume name to place it onto a zone.
    
We hash the volume name so we don't bias to the first few zones.
    
If the volume name "looks like" a PetSet volume name (ending with
-<number>) then we use the number as an offset.  In that case we hash
the base name.
This commit is contained in:
k8s-merge-robot 2016-06-22 01:22:16 -07:00 committed by GitHub
commit 07471cf90f
9 changed files with 198 additions and 15 deletions

View File

@ -48,6 +48,7 @@ import (
aws_credentials "k8s.io/kubernetes/pkg/credentialprovider/aws" aws_credentials "k8s.io/kubernetes/pkg/credentialprovider/aws"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
) )
const ProviderName = "aws" const ProviderName = "aws"
@ -197,6 +198,7 @@ type EC2Metadata interface {
type VolumeOptions struct { type VolumeOptions struct {
CapacityGB int CapacityGB int
Tags map[string]string Tags map[string]string
PVCName string
} }
// Volumes is an interface for managing cloud-provisioned volumes // Volumes is an interface for managing cloud-provisioned volumes
@ -913,6 +915,49 @@ func (aws *AWSCloud) List(filter string) ([]string, error) {
return aws.getInstancesByRegex(filter) return aws.getInstancesByRegex(filter)
} }
// getAllZones retrieves a list of all the zones in which nodes are running
// It currently involves querying all instances
func (c *AWSCloud) getAllZones() (sets.String, error) {
// We don't currently cache this; it is currently used only in volume
// creation which is expected to be a comparatively rare occurence.
// TODO: Caching / expose api.Nodes to the cloud provider?
// TODO: We could also query for subnets, I think
filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
filters = c.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := c.ec2.DescribeInstances(request)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("no instances returned")
}
zones := sets.NewString()
for _, instance := range instances {
// Only return fully-ready instances when listing instances
// (vs a query by name, where we will return it if we find it)
if orEmpty(instance.State.Name) == "pending" {
glog.V(2).Infof("Skipping EC2 instance (pending): %s", *instance.InstanceId)
continue
}
if instance.Placement != nil {
zone := aws.StringValue(instance.Placement.AvailabilityZone)
zones.Insert(zone)
}
}
glog.V(2).Infof("Found instances in zones %s", zones)
return zones, nil
}
// GetZone implements Zones.GetZone // GetZone implements Zones.GetZone
func (c *AWSCloud) GetZone() (cloudprovider.Zone, error) { func (c *AWSCloud) GetZone() (cloudprovider.Zone, error) {
return cloudprovider.Zone{ return cloudprovider.Zone{
@ -1383,11 +1428,14 @@ func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, e
return hostDevicePath, err return hostDevicePath, err
} }
// Implements Volumes.CreateVolume // CreateDisk implements Volumes.CreateDisk
func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) { func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
// Default to creating in the current zone allZones, err := s.getAllZones()
// TODO: Spread across zones? if err != nil {
createAZ := s.selfAWSInstance.availabilityZone return "", fmt.Errorf("error querying for all zones: %v", err)
}
createAZ := volume.ChooseZoneForVolume(allZones, volumeOptions.PVCName)
// TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?) // TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?)
request := &ec2.CreateVolumeInput{} request := &ec2.CreateVolumeInput{}

View File

@ -123,7 +123,9 @@ type Disks interface {
// GetAutoLabelsForPD returns labels to apply to PersistentVolume // GetAutoLabelsForPD returns labels to apply to PersistentVolume
// representing this PD, namely failure domain and zone. // representing this PD, namely failure domain and zone.
GetAutoLabelsForPD(name string) (map[string]string, error) // zone can be provided to specify the zone for the PD,
// if empty all managed zones will be searched.
GetAutoLabelsForPD(name string, zone string) (map[string]string, error)
} }
func init() { func init() {
@ -2069,6 +2071,48 @@ func (gce *GCECloud) List(filter string) ([]string, error) {
return instances, nil return instances, nil
} }
// GetAllZones returns all the zones in which nodes are running
func (gce *GCECloud) GetAllZones() (sets.String, error) {
// Fast-path for non-multizone
if len(gce.managedZones) == 1 {
return sets.NewString(gce.managedZones...), nil
}
// TODO: Caching, but this is currently only called when we are creating a volume,
// which is a relatively infrequent operation, and this is only # zones API calls
zones := sets.NewString()
// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones {
// We only retrieve one page in each zone - we only care about existence
listCall := gce.service.Instances.List(gce.projectID, zone)
// No filter: We assume that a zone is either used or unused
// We could only consider running nodes (like we do in List above),
// but probably if instances are starting we still want to consider them.
// I think we should wait until we have a reason to make the
// call one way or the other; we generally can't guarantee correct
// volume spreading if the set of zones is changing
// (and volume spreading is currently only a heuristic).
// Long term we want to replace GetAllZones (which primarily supports volume
// spreading) with a scheduler policy that is able to see the global state of
// volumes and the health of zones.
// Just a minimal set of fields - we only care about existence
listCall = listCall.Fields("items(name)")
res, err := listCall.Do()
if err != nil {
return nil, err
}
if len(res.Items) != 0 {
zones.Insert(zone)
}
}
return zones, nil
}
func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) { func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) {
for _, item := range metadata.Items { for _, item := range metadata.Items {
if item.Key == key { if item.Key == key {
@ -2221,13 +2265,33 @@ func (gce *GCECloud) DeleteDisk(diskToDelete string) error {
// Builds the labels that should be automatically added to a PersistentVolume backed by a GCE PD // Builds the labels that should be automatically added to a PersistentVolume backed by a GCE PD
// Specifically, this builds FailureDomain (zone) and Region labels. // Specifically, this builds FailureDomain (zone) and Region labels.
// The PersistentVolumeLabel admission controller calls this and adds the labels when a PV is created. // The PersistentVolumeLabel admission controller calls this and adds the labels when a PV is created.
func (gce *GCECloud) GetAutoLabelsForPD(name string) (map[string]string, error) { // If zone is specified, the volume will only be found in the specified zone,
disk, err := gce.getDiskByNameUnknownZone(name) // otherwise all managed zones will be searched.
if err != nil { func (gce *GCECloud) GetAutoLabelsForPD(name string, zone string) (map[string]string, error) {
return nil, err var disk *gceDisk
var err error
if zone == "" {
// We would like as far as possible to avoid this case,
// because GCE doesn't guarantee that volumes are uniquely named per region,
// just per zone. However, creation of GCE PDs was originally done only
// by name, so we have to continue to support that.
// However, wherever possible the zone should be passed (and it is passed
// for most cases that we can control, e.g. dynamic volume provisioning)
disk, err = gce.getDiskByNameUnknownZone(name)
if err != nil {
return nil, err
}
zone = disk.Zone
} else {
// We could assume the disks exists; we have all the information we need
// However it is more consistent to ensure the disk exists,
// and in future we may gather addition information (e.g. disk type, IOPS etc)
disk, err = gce.getDiskByName(name, zone)
if err != nil {
return nil, err
}
} }
zone := disk.Zone
region, err := GetGCERegion(zone) region, err := GetGCERegion(zone)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -1131,6 +1131,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
CloudTags: &tags, CloudTags: &tags,
ClusterName: ctrl.clusterName, ClusterName: ctrl.clusterName,
PVName: pvName, PVName: pvName,
PVCName: claim.Name,
} }
// Provision the volume // Provision the volume

View File

@ -82,6 +82,7 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (strin
volumeOptions := &aws.VolumeOptions{ volumeOptions := &aws.VolumeOptions{
CapacityGB: requestGB, CapacityGB: requestGB,
Tags: tags, Tags: tags,
PVCName: c.options.PVCName,
} }
name, err := cloud.CreateDisk(volumeOptions) name, err := cloud.CreateDisk(volumeOptions)

View File

@ -359,6 +359,6 @@ func (testcase *testcase) DeleteDisk(diskToDelete string) error {
return errors.New("Not implemented") return errors.New("Not implemented")
} }
func (testcase *testcase) GetAutoLabelsForPD(name string) (map[string]string, error) { func (testcase *testcase) GetAutoLabelsForPD(name string, zone string) (map[string]string, error) {
return map[string]string{}, errors.New("Not implemented") return map[string]string{}, errors.New("Not implemented")
} }

View File

@ -82,20 +82,22 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin
// The disk will be created in the zone in which this code is currently running // The disk will be created in the zone in which this code is currently running
// TODO: We should support auto-provisioning volumes in multiple/specified zones // TODO: We should support auto-provisioning volumes in multiple/specified zones
zone, err := cloud.GetZone() zones, err := cloud.GetAllZones()
if err != nil { if err != nil {
glog.V(2).Infof("error getting zone information from GCE: %v", err) glog.V(2).Infof("error getting zone information from GCE: %v", err)
return "", 0, nil, err return "", 0, nil, err
} }
err = cloud.CreateDisk(name, zone.FailureDomain, int64(requestGB), *c.options.CloudTags) zone := volume.ChooseZoneForVolume(zones, c.options.PVCName)
err = cloud.CreateDisk(name, zone, int64(requestGB), *c.options.CloudTags)
if err != nil { if err != nil {
glog.V(2).Infof("Error creating GCE PD volume: %v", err) glog.V(2).Infof("Error creating GCE PD volume: %v", err)
return "", 0, nil, err return "", 0, nil, err
} }
glog.V(2).Infof("Successfully created GCE PD volume %s", name) glog.V(2).Infof("Successfully created GCE PD volume %s", name)
labels, err := cloud.GetAutoLabelsForPD(name) labels, err := cloud.GetAutoLabelsForPD(name, zone)
if err != nil { if err != nil {
// We don't really want to leak the volume here... // We don't really want to leak the volume here...
glog.Errorf("error getting labels for volume %q: %v", name, err) glog.Errorf("error getting labels for volume %q: %v", name, err)

View File

@ -49,6 +49,8 @@ type VolumeOptions struct {
// PV.Name of the appropriate PersistentVolume. Used to generate cloud // PV.Name of the appropriate PersistentVolume. Used to generate cloud
// volume name. // volume name.
PVName string PVName string
// PVC.Name of the PersistentVolumeClaim; only set during dynamic provisioning.
PVCName string
// Unique name of Kubernetes cluster. // Unique name of Kubernetes cluster.
ClusterName string ClusterName string
// Tags to attach to the real volume in the cloud provider - e.g. AWS EBS // Tags to attach to the real volume in the cloud provider - e.g. AWS EBS

View File

@ -28,8 +28,13 @@ import (
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
"hash/fnv"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/util/sets"
"math/rand"
"strconv"
"strings"
) )
// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume // RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume
@ -187,3 +192,59 @@ func GenerateVolumeName(clusterName, pvName string, maxLength int) string {
} }
return prefix + "-" + pvName return prefix + "-" + pvName
} }
// ChooseZone implements our heuristics for choosing a zone for volume creation based on the volume name
// Volumes are generally round-robin-ed across all active zones, using the hash of the PVC Name.
// However, if the PVCName ends with `-<integer>`, we will hash the prefix, and then add the integer to the hash.
// This means that a PetSet's volumes (`claimname-petsetname-id`) will spread across available zones,
// assuming the id values are consecutive.
func ChooseZoneForVolume(zones sets.String, pvcName string) string {
// We create the volume in a zone determined by the name
// Eventually the scheduler will coordinate placement into an available zone
var hash uint32
var index uint32
if pvcName == "" {
// We should always be called with a name; this shouldn't happen
glog.Warningf("No name defined during volume create; choosing random zone")
hash = rand.Uint32()
} else {
hashString := pvcName
// Heuristic to make sure that volumes in a PetSet are spread across zones
// PetSet PVCs are (currently) named ClaimName-PetSetName-Id,
// where Id is an integer index
lastDash := strings.LastIndexByte(pvcName, '-')
if lastDash != -1 {
petIDString := pvcName[lastDash+1:]
petID, err := strconv.ParseUint(petIDString, 10, 32)
if err == nil {
// Offset by the pet id, so we round-robin across zones
index = uint32(petID)
// We still hash the volume name, but only the base
hashString = pvcName[:lastDash]
glog.V(2).Infof("Detected PetSet-style volume name %q; index=%d", pvcName, index)
}
}
// We hash the (base) volume name, so we don't bias towards the first N zones
h := fnv.New32()
h.Write([]byte(hashString))
hash = h.Sum32()
}
// Zones.List returns zones in a consistent order (sorted)
// We do have a potential failure case where volumes will not be properly spread,
// if the set of zones changes during PetSet volume creation. However, this is
// probably relatively unlikely because we expect the set of zones to be essentially
// static for clusters.
// Hopefully we can address this problem if/when we do full scheduler integration of
// PVC placement (which could also e.g. avoid putting volumes in overloaded or
// unhealthy zones)
zoneSlice := zones.List()
zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))]
glog.V(2).Infof("Creating volume for PVC %q; chose zone=%q from zones=%q", pvcName, zone, zoneSlice)
return zone
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
@ -159,7 +160,10 @@ func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (m
return nil, fmt.Errorf("unable to build GCE cloud provider for PD") return nil, fmt.Errorf("unable to build GCE cloud provider for PD")
} }
labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName) // If the zone is already labeled, honor the hint
zone := volume.Labels[unversioned.LabelZoneFailureDomain]
labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName, zone)
if err != nil { if err != nil {
return nil, err return nil, err
} }