GCE/AWS: Spread PetSet volume creation across zones

Long term we plan on integrating this into the scheduler, but in the
short term we use the volume name to place it onto a zone.

We hash the volume name so we don't bias to the first few zones.

If the volume name "looks like" a PetSet volume name (ending with
-<number>) then we use the number as an offset.  In that case we hash
the base name.

Fixes #27256
This commit is contained in:
Justin Santa Barbara 2016-06-16 13:36:55 -04:00
parent afedd3d34d
commit e711cbf912
7 changed files with 138 additions and 6 deletions

View File

@ -49,6 +49,7 @@ import (
aws_credentials "k8s.io/kubernetes/pkg/credentialprovider/aws"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
)
const ProviderName = "aws"
@ -198,6 +199,7 @@ type EC2Metadata interface {
type VolumeOptions struct {
CapacityGB int
Tags map[string]string
PVCName string
}
// Volumes is an interface for managing cloud-provisioned volumes
@ -914,6 +916,46 @@ func (aws *AWSCloud) List(filter string) ([]string, error) {
return aws.getInstancesByRegex(filter)
}
// getAllZones retrieves a list of all the zones in which nodes are running
// It currently involves querying all instances
func (c *AWSCloud) getAllZones() (sets.String, error) {
// TODO: Caching, although we currently only use this in volume creation
// TODO: We could also query for subnets, I think
filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
filters = c.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := c.ec2.DescribeInstances(request)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("no instances returned")
}
zones := sets.NewString()
for _, instance := range instances {
// Only return fully-ready instances when listing instances
// (vs a query by name, where we will return it if we find it)
if orEmpty(instance.State.Name) == "pending" {
glog.V(2).Infof("Skipping EC2 instance (pending): %s", *instance.InstanceId)
continue
}
if instance.Placement != nil {
zone := aws.StringValue(instance.Placement.AvailabilityZone)
zones.Insert(zone)
}
}
glog.V(2).Infof("Found instances in zones %s", zones)
return zones, nil
}
// GetZone implements Zones.GetZone
func (c *AWSCloud) GetZone() (cloudprovider.Zone, error) {
return cloudprovider.Zone{
@ -1387,11 +1429,14 @@ func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, e
return hostDevicePath, err
}
// Implements Volumes.CreateVolume
// CreateDisk implements Volumes.CreateDisk
func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
// Default to creating in the current zone
// TODO: Spread across zones?
createAZ := s.selfAWSInstance.availabilityZone
allZones, err := s.getAllZones()
if err != nil {
return "", fmt.Errorf("error querying for all zones: %v", err)
}
createAZ := volume.ChooseZoneForVolume(allZones, volumeOptions.PVCName)
// TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?)
request := &ec2.CreateVolumeInput{}

View File

@ -2066,6 +2066,38 @@ func (gce *GCECloud) List(filter string) ([]string, error) {
return instances, nil
}
// GetAllZones returns all the zones in which nodes are running
func (gce *GCECloud) GetAllZones() (sets.String, error) {
if len(gce.managedZones) == 1 {
return sets.NewString(gce.managedZones...), nil
}
// TODO: Caching, but this is currently only called when we are creating a volume,
// which is a relatively infrequent operation, and this is only # zones API calls
zones := sets.NewString()
// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones {
// We only retrieve one page in each zone - we only care about existence
listCall := gce.service.Instances.List(gce.projectID, zone)
// No filter: We assume that a zone is either used or unused
// Just a minimal set of fields - we only care about existence
listCall = listCall.Fields("items(name)")
res, err := listCall.Do()
if err != nil {
return nil, err
}
if len(res.Items) != 0 {
zones.Insert(zone)
}
}
return zones, nil
}
func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) {
for _, item := range metadata.Items {
if item.Key == key {

View File

@ -1125,6 +1125,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
CloudTags: &tags,
ClusterName: ctrl.clusterName,
PVName: pvName,
PVCName: claim.Name,
}
// Provision the volume

View File

@ -82,6 +82,7 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (strin
volumeOptions := &aws.VolumeOptions{
CapacityGB: requestGB,
Tags: tags,
PVCName: c.options.PVCName,
}
name, err := cloud.CreateDisk(volumeOptions)

View File

@ -82,13 +82,15 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin
// The disk will be created in the zone in which this code is currently running
// TODO: We should support auto-provisioning volumes in multiple/specified zones
zone, err := cloud.GetZone()
zones, err := cloud.GetAllZones()
if err != nil {
glog.V(2).Infof("error getting zone information from GCE: %v", err)
return "", 0, nil, err
}
err = cloud.CreateDisk(name, zone.FailureDomain, int64(requestGB), *c.options.CloudTags)
zone := volume.ChooseZoneForVolume(zones, c.options.PVCName)
err = cloud.CreateDisk(name, zone, int64(requestGB), *c.options.CloudTags)
if err != nil {
glog.V(2).Infof("Error creating GCE PD volume: %v", err)
return "", 0, nil, err

View File

@ -49,6 +49,8 @@ type VolumeOptions struct {
// PV.Name of the appropriate PersistentVolume. Used to generate cloud
// volume name.
PVName string
// PVC.Name of the PersistentVolumeClaim, if we are auto-provisioning.
PVCName string
// Unique name of Kubernetes cluster.
ClusterName string
// Tags to attach to the real volume in the cloud provider - e.g. AWS EBS

View File

@ -28,8 +28,13 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
"hash/fnv"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/util/sets"
"math/rand"
"strconv"
"strings"
)
// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume
@ -187,3 +192,47 @@ func GenerateVolumeName(clusterName, pvName string, maxLength int) string {
}
return prefix + "-" + pvName
}
// ChooseZone implements our heuristics for choosing a zone for volume creation based on the volume name
func ChooseZoneForVolume(zones sets.String, pvcName string) string {
// We create the volume in a zone determined by the name
// Eventually the scheduler will coordinate placement into an available zone
var hash uint32
var index uint32
if pvcName == "" {
// We should always be called with a name; this shouldn't happen
glog.Warningf("No Name defined during volume create; choosing random zone")
hash = rand.Uint32()
} else {
hashString := pvcName
// Heuristic to make sure that volumes in a PetSet are spread across zones
// PetSet PVCs are (currently) named ClaimName-PetSetName-Id,
// where Id is an integer index
lastDash := strings.LastIndexByte(pvcName, '-')
if lastDash != -1 {
petIDString := pvcName[lastDash+1:]
petID, err := strconv.ParseUint(petIDString, 10, 32)
if err == nil {
// Offset by the pet id, so we round-robin across zones
index = uint32(petID)
// We still hash the volume name, but only the base
hashString = pvcName[:lastDash]
glog.V(2).Infof("Detected PetSet-style volume name %q; index=%d", pvcName, index)
}
}
// We hash the (base) volume name, so we don't bias towards the first N zones
h := fnv.New32()
h.Write([]byte(hashString))
hash = h.Sum32()
}
zoneSlice := zones.List()
zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))]
glog.V(2).Infof("Creating volume for PVC %q; chose zone=%q from zones=%q", pvcName, zone, zoneSlice)
return zone
}