Ubernetes Lite GCE: Support multiple zones in GCE cloud provider

We adapt the existing code to work across all zones in a region.

We require a feature-flag to enable Ubernetes-Lite

Reasons:

* There are some behavioural changes if users create volumes with
the same name in two zones.
* We don't want to make one API call per zone if we're not running
Ubernetes-Lite.
* Ubernetes-Lite is still experimental.

There isn't a parallel flag implemented for AWS, because at the moment
there would be no behaviour changes from this.
This commit is contained in:
Justin Santa Barbara 2015-11-29 14:40:09 -05:00
parent 2958ea253a
commit 43cbfb74fe
8 changed files with 420 additions and 128 deletions

View File

@ -586,22 +586,42 @@ grains:
- kubernetes-master
cloud: gce
EOF
if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]] && ! [[ -z "${TOKEN_BODY:-}" ]] && ! [[ -z "${NODE_NETWORK:-}" ]] ; then
cat <<EOF >/etc/gce.conf
cat <<EOF >/etc/gce.conf
[global]
EOF
CLOUD_CONFIG='' # Set to non-empty path if we are using the gce.conf file
if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]] && ! [[ -z "${TOKEN_BODY:-}" ]] && ! [[ -z "${NODE_NETWORK:-}" ]] ; then
cat <<EOF >>/etc/gce.conf
token-url = ${TOKEN_URL}
token-body = ${TOKEN_BODY}
project-id = ${PROJECT_ID}
network-name = ${NODE_NETWORK}
EOF
CLOUD_CONFIG=/etc/gce.conf
EXTERNAL_IP=$(curl --fail --silent -H 'Metadata-Flavor: Google' "http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip")
cat <<EOF >>/etc/salt/minion.d/grains.conf
cloud_config: /etc/gce.conf
advertise_address: '${EXTERNAL_IP}'
proxy_ssh_user: '${PROXY_SSH_USER}'
EOF
fi
if [[ -n "${MULTIZONE:-}" ]]; then
cat <<EOF >>/etc/gce.conf
multizone = ${MULTIZONE}
EOF
CLOUD_CONFIG=/etc/gce.conf
fi
if [[ -n ${CLOUD_CONFIG:-} ]]; then
cat <<EOF >>/etc/salt/minion.d/grains.conf
cloud_config: ${CLOUD_CONFIG}
EOF
else
rm -f /etc/gce.conf
fi
# If the kubelet on the master is enabled, give it the same CIDR range
# as a generic node.
if [[ ! -z "${KUBELET_APISERVER:-}" ]] && [[ ! -z "${KUBELET_CERT:-}" ]] && [[ ! -z "${KUBELET_KEY:-}" ]]; then

View File

@ -1411,6 +1411,7 @@ OPENCONTRAIL_PUBLIC_SUBNET: $(yaml-quote ${OPENCONTRAIL_PUBLIC_SUBNET:-})
E2E_STORAGE_TEST_ENVIRONMENT: $(yaml-quote ${E2E_STORAGE_TEST_ENVIRONMENT:-})
KUBE_IMAGE_TAG: $(yaml-quote ${KUBE_IMAGE_TAG:-})
KUBE_DOCKER_REGISTRY: $(yaml-quote ${KUBE_DOCKER_REGISTRY:-})
MULTIZONE: $(yaml-quote ${MULTIZONE:-})
EOF
if [ -n "${KUBELET_PORT:-}" ]; then
cat >>$file <<EOF

View File

@ -6,7 +6,7 @@ cluster/aws/templates/salt-minion.sh:# We set the hostname_override to the full
cluster/centos/util.sh: local node_ip=${node#*@}
cluster/gce/configure-vm.sh: advertise_address: '${EXTERNAL_IP}'
cluster/gce/configure-vm.sh: api_servers: '${KUBERNETES_MASTER_NAME}'
cluster/gce/configure-vm.sh: cloud_config: /etc/gce.conf
cluster/gce/configure-vm.sh: cloud_config: ${CLOUD_CONFIG}
cluster/gce/configure-vm.sh: runtime_config: '$(echo "$RUNTIME_CONFIG" | sed -e "s/'/''/g")'
cluster/gce/coreos/helper.sh:# cloud_config yaml file should be passed
cluster/gce/util.sh: local node_ip=$(gcloud compute instances describe --project "${PROJECT}" --zone "${ZONE}" \

View File

@ -65,7 +65,9 @@ type GCECloud struct {
service *compute.Service
containerService *container.Service
projectID string
zone string
region string
localZone string // The zone in which we are runniing
managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master)
networkURL string
useMetadataServer bool
}
@ -76,6 +78,7 @@ type Config struct {
TokenBody string `gcfg:"token-body"`
ProjectID string `gcfg:"project-id"`
NetworkName string `gcfg:"network-name"`
Multizone bool `gcfg:"multizone"`
}
}
@ -145,6 +148,27 @@ func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, e
return networkList.Items[0].Name, nil
}
func getZonesForRegion(svc *compute.Service, projectID, region string) ([]string, error) {
listCall := svc.Zones.List(projectID)
// Filtering by region doesn't seem to work
// (tested in https://cloud.google.com/compute/docs/reference/latest/zones/list)
// listCall = listCall.Filter("region eq " + region)
res, err := listCall.Do()
if err != nil {
return nil, fmt.Errorf("unexpected response listing zones: %v", err)
}
zones := []string{}
for _, zone := range res.Items {
regionName := lastComponent(zone.Region)
if regionName == region {
zones = append(zones, zone.Name)
}
}
return zones, nil
}
// newGCECloud creates a new instance of GCECloud.
func newGCECloud(config io.Reader) (*GCECloud, error) {
projectID, zone, err := getProjectAndZone()
@ -152,12 +176,20 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
return nil, err
}
region, err := GetGCERegion(zone)
if err != nil {
return nil, err
}
networkName, err := getNetworkNameViaMetadata()
if err != nil {
return nil, err
}
networkURL := gceNetworkURL(projectID, networkName)
// By default, Kubernetes clusters only run against one zone
managedZones := []string{zone}
tokenSource := google.ComputeTokenSource("")
if config != nil {
var cfg Config
@ -178,15 +210,19 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
if cfg.Global.TokenURL != "" {
tokenSource = newAltTokenSource(cfg.Global.TokenURL, cfg.Global.TokenBody)
}
if cfg.Global.Multizone {
managedZones = nil // Use all zones in region
}
}
return CreateGCECloud(projectID, zone, networkURL, tokenSource, true /* useMetadataServer */)
return CreateGCECloud(projectID, region, zone, managedZones, networkURL, tokenSource, true /* useMetadataServer */)
}
// Creates a GCECloud object using the specified parameters.
// If no networkUrl is specified, loads networkName via rest call.
// If no tokenSource is specified, uses oauth2.DefaultTokenSource.
func CreateGCECloud(projectID, zone, networkURL string, tokenSource oauth2.TokenSource, useMetadataServer bool) (*GCECloud, error) {
// If managedZones is nil / empty all zones in the region will be managed.
func CreateGCECloud(projectID, region, zone string, managedZones []string, networkURL string, tokenSource oauth2.TokenSource, useMetadataServer bool) (*GCECloud, error) {
if tokenSource == nil {
var err error
tokenSource, err = google.DefaultTokenSource(
@ -220,11 +256,23 @@ func CreateGCECloud(projectID, zone, networkURL string, tokenSource oauth2.Token
networkURL = gceNetworkURL(projectID, networkName)
}
if len(managedZones) == 0 {
managedZones, err = getZonesForRegion(svc, projectID, region)
if err != nil {
return nil, err
}
}
if len(managedZones) != 1 {
glog.Infof("managing multiple zones: %v", managedZones)
}
return &GCECloud{
service: svc,
containerService: containerSvc,
projectID: projectID,
zone: zone,
region: region,
localZone: zone,
managedZones: managedZones,
networkURL: networkURL,
useMetadataServer: useMetadataServer,
}, nil
@ -279,9 +327,8 @@ func makeHostURL(projectID, zone, host string) string {
projectID, zone, host)
}
func makeComparableHostPath(zone, host string) string {
host = canonicalizeInstanceName(host)
return fmt.Sprintf("/zones/%s/instances/%s", zone, host)
func (h *gceInstance) makeComparableHostPath() string {
return fmt.Sprintf("/zones/%s/instances/%s", h.Zone, h.Name)
}
func hostURLToComparablePath(hostURL string) string {
@ -344,9 +391,9 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error
})
}
func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error {
func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string) error {
return waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, operationName).Do()
return gce.service.ZoneOperations.Get(gce.projectID, zone, operationName).Do()
})
}
@ -377,13 +424,18 @@ func isHTTPErrorCode(err error, code int) bool {
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, ports, hosts)
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, ports, hostNames)
if len(hosts) == 0 {
if len(hostNames) == 0 {
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
}
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return nil, err
}
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(name, region, requestedIP, ports)
if err != nil {
@ -651,10 +703,10 @@ func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports
return nil
}
func (gce *GCECloud) createTargetPool(name, region string, hosts []string, affinityType api.ServiceAffinity) error {
func (gce *GCECloud) createTargetPool(name, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, gce.zone, host))
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
pool := &compute.TargetPool{
Name: name,
@ -674,7 +726,7 @@ func (gce *GCECloud) createTargetPool(name, region string, hosts []string, affin
return nil
}
func (gce *GCECloud) createFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) error {
func (gce *GCECloud) createFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, ipAddress, ports, hosts)
if err != nil {
return err
@ -692,7 +744,7 @@ func (gce *GCECloud) createFirewall(name, region, ipAddress string, ports []*api
return nil
}
func (gce *GCECloud) updateFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) error {
func (gce *GCECloud) updateFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, ipAddress, ports, hosts)
if err != nil {
return err
@ -710,7 +762,7 @@ func (gce *GCECloud) updateFirewall(name, region, ipAddress string, ports []*api
return nil
}
func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) (*compute.Firewall, error) {
func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
@ -739,32 +791,41 @@ func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api
// * The longest tag that is a prefix of the instance name is used
// * If any instance has a prefix tag, all instances must
// * If no instances have a prefix tag, no tags are used
func (gce *GCECloud) computeHostTags(hosts []string) ([]string, error) {
listCall := gce.service.Instances.List(gce.projectID, gce.zone)
// Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(hosts, "|") + ")")
// Add the fields we want
listCall = listCall.Fields("items(name,tags)")
res, err := listCall.Do()
if err != nil {
return nil, err
func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
// TODO: We could store the tags in gceInstance, so we could have already fetched it
hostNamesByZone := make(map[string][]string)
for _, host := range hosts {
hostNamesByZone[host.Zone] = append(hostNamesByZone[host.Zone], host.Name)
}
tags := sets.NewString()
for _, instance := range res.Items {
longest_tag := ""
for _, tag := range instance.Tags.Items {
if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
longest_tag = tag
}
for zone, hostNames := range hostNamesByZone {
listCall := gce.service.Instances.List(gce.projectID, zone)
// Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(hostNames, "|") + ")")
// Add the fields we want
listCall = listCall.Fields("items(name,tags)")
res, err := listCall.Do()
if err != nil {
return nil, err
}
if len(longest_tag) > 0 {
tags.Insert(longest_tag)
} else if len(tags) > 0 {
return nil, fmt.Errorf("Some, but not all, instances have prefix tags (%s is missing)", instance.Name)
for _, instance := range res.Items {
longest_tag := ""
for _, tag := range instance.Tags.Items {
if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
longest_tag = tag
}
}
if len(longest_tag) > 0 {
tags.Insert(longest_tag)
} else if len(tags) > 0 {
return nil, fmt.Errorf("Some, but not all, instances have prefix tags (%s is missing)", instance.Name)
}
}
}
@ -818,7 +879,12 @@ func (gce *GCECloud) createOrPromoteStaticIP(name, region, existingIP string) (i
}
// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(name, region string, hosts []string) error {
func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return err
}
pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
if err != nil {
return err
@ -831,7 +897,7 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hosts []string) err
var toAdd []*compute.InstanceReference
var toRemove []*compute.InstanceReference
for _, host := range hosts {
link := makeComparableHostPath(gce.zone, host)
link := host.makeComparableHostPath()
if !existing.Has(link) {
toAdd = append(toAdd, &compute.InstanceReference{Instance: link})
}
@ -1216,52 +1282,52 @@ func (gce *GCECloud) ListHttpHealthChecks() (*compute.HttpHealthCheckList, error
// InstanceGroup Management
// CreateInstanceGroup creates an instance group with the given instances. It is the callers responsibility to add named ports.
func (gce *GCECloud) CreateInstanceGroup(name string) (*compute.InstanceGroup, error) {
func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
op, err := gce.service.InstanceGroups.Insert(
gce.projectID, gce.zone, &compute.InstanceGroup{Name: name}).Do()
gce.projectID, zone, &compute.InstanceGroup{Name: name}).Do()
if err != nil {
return nil, err
}
if err = gce.waitForZoneOp(op); err != nil {
if err = gce.waitForZoneOp(op, zone); err != nil {
return nil, err
}
return gce.GetInstanceGroup(name)
return gce.GetInstanceGroup(name, zone)
}
// DeleteInstanceGroup deletes an instance group.
func (gce *GCECloud) DeleteInstanceGroup(name string) error {
func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
op, err := gce.service.InstanceGroups.Delete(
gce.projectID, gce.zone, name).Do()
gce.projectID, zone, name).Do()
if err != nil {
return err
}
return gce.waitForZoneOp(op)
return gce.waitForZoneOp(op, zone)
}
// ListInstanceGroups lists all InstanceGroups in the project and zone.
func (gce *GCECloud) ListInstanceGroups() (*compute.InstanceGroupList, error) {
return gce.service.InstanceGroups.List(gce.projectID, gce.zone).Do()
func (gce *GCECloud) ListInstanceGroups(zone string) (*compute.InstanceGroupList, error) {
return gce.service.InstanceGroups.List(gce.projectID, zone).Do()
}
// ListInstancesInInstanceGroup lists all the instances in a given istance group and state.
func (gce *GCECloud) ListInstancesInInstanceGroup(name string, state string) (*compute.InstanceGroupsListInstances, error) {
// ListInstancesInInstanceGroup lists all the instances in a given instance group and state.
func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, state string) (*compute.InstanceGroupsListInstances, error) {
return gce.service.InstanceGroups.ListInstances(
gce.projectID, gce.zone, name,
gce.projectID, zone, name,
&compute.InstanceGroupsListInstancesRequest{InstanceState: state}).Do()
}
// AddInstancesToInstanceGroup adds the given instances to the given instance group.
func (gce *GCECloud) AddInstancesToInstanceGroup(name string, instanceNames []string) error {
func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceNames []string) error {
if len(instanceNames) == 0 {
return nil
}
// Adding the same instance twice will result in a 4xx error
instances := []*compute.InstanceReference{}
for _, ins := range instanceNames {
instances = append(instances, &compute.InstanceReference{Instance: makeHostURL(gce.projectID, gce.zone, ins)})
instances = append(instances, &compute.InstanceReference{Instance: makeHostURL(gce.projectID, zone, ins)})
}
op, err := gce.service.InstanceGroups.AddInstances(
gce.projectID, gce.zone, name,
gce.projectID, zone, name,
&compute.InstanceGroupsAddInstancesRequest{
Instances: instances,
}).Do()
@ -1269,21 +1335,21 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, instanceNames []st
if err != nil {
return err
}
return gce.waitForZoneOp(op)
return gce.waitForZoneOp(op, zone)
}
// RemoveInstancesFromInstanceGroup removes the given instances from the instance group.
func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, instanceNames []string) error {
func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceNames []string) error {
if len(instanceNames) == 0 {
return nil
}
instances := []*compute.InstanceReference{}
for _, ins := range instanceNames {
instanceLink := makeHostURL(gce.projectID, gce.zone, ins)
instanceLink := makeHostURL(gce.projectID, zone, ins)
instances = append(instances, &compute.InstanceReference{Instance: instanceLink})
}
op, err := gce.service.InstanceGroups.RemoveInstances(
gce.projectID, gce.zone, name,
gce.projectID, zone, name,
&compute.InstanceGroupsRemoveInstancesRequest{
Instances: instances,
}).Do()
@ -1294,7 +1360,7 @@ func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, instanceNames
}
return err
}
return gce.waitForZoneOp(op)
return gce.waitForZoneOp(op, zone)
}
// AddPortToInstanceGroup adds a port to the given instance group.
@ -1309,21 +1375,21 @@ func (gce *GCECloud) AddPortToInstanceGroup(ig *compute.InstanceGroup, port int6
namedPort := compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port}
ig.NamedPorts = append(ig.NamedPorts, &namedPort)
op, err := gce.service.InstanceGroups.SetNamedPorts(
gce.projectID, gce.zone, ig.Name,
gce.projectID, ig.Zone, ig.Name,
&compute.InstanceGroupsSetNamedPortsRequest{
NamedPorts: ig.NamedPorts}).Do()
if err != nil {
return nil, err
}
if err = gce.waitForZoneOp(op); err != nil {
if err = gce.waitForZoneOp(op, ig.Zone); err != nil {
return nil, err
}
return &namedPort, nil
}
// GetInstanceGroup returns an instance group by name.
func (gce *GCECloud) GetInstanceGroup(name string) (*compute.InstanceGroup, error) {
return gce.service.InstanceGroups.Get(gce.projectID, gce.zone, name).Do()
func (gce *GCECloud) GetInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
return gce.service.InstanceGroups.Get(gce.projectID, zone, name).Do()
}
// Take a GCE instance 'hostname' and break it down to something that can be fed
@ -1337,20 +1403,6 @@ func canonicalizeInstanceName(name string) string {
return name
}
// Return the instances matching the relevant name.
func (gce *GCECloud) getInstanceByName(name string) (*compute.Instance, error) {
name = canonicalizeInstanceName(name)
res, err := gce.service.Instances.Get(gce.projectID, gce.zone, name).Do()
if err != nil {
glog.Errorf("Failed to retrieve TargetInstance resource for instance: %s", name)
if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == http.StatusNotFound {
return nil, cloudprovider.InstanceNotFound
}
return nil, err
}
return res, nil
}
// Implementation of Instances.CurrentNodeName
func (gce *GCECloud) CurrentNodeName(hostname string) (string, error) {
return hostname, nil
@ -1446,27 +1498,34 @@ func (gce *GCECloud) ExternalID(instance string) (string, error) {
if err != nil {
return "", err
}
return strconv.FormatUint(inst.Id, 10), nil
return strconv.FormatUint(inst.ID, 10), nil
}
// InstanceID returns the cloud provider ID of the specified instance.
func (gce *GCECloud) InstanceID(instance string) (string, error) {
return gce.projectID + "/" + gce.zone + "/" + canonicalizeInstanceName(instance), nil
func (gce *GCECloud) InstanceID(instanceName string) (string, error) {
instance, err := gce.getInstanceByName(instanceName)
if err != nil {
return "", err
}
return gce.projectID + "/" + instance.Zone + "/" + instance.Name, nil
}
// List is an implementation of Instances.List.
func (gce *GCECloud) List(filter string) ([]string, error) {
listCall := gce.service.Instances.List(gce.projectID, gce.zone)
if len(filter) > 0 {
listCall = listCall.Filter("name eq " + filter)
}
res, err := listCall.Do()
if err != nil {
return nil, err
}
var instances []string
for _, instance := range res.Items {
instances = append(instances, instance.Name)
// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones {
listCall := gce.service.Instances.List(gce.projectID, zone)
if len(filter) > 0 {
listCall = listCall.Filter("name eq " + filter)
}
res, err := listCall.Do()
if err != nil {
return nil, err
}
for _, instance := range res.Items {
instances = append(instances, instance.Name)
}
}
return instances, nil
}
@ -1524,11 +1583,14 @@ func gceNetworkURL(project, network string) string {
func (gce *GCECloud) CreateRoute(clusterName string, nameHint string, route *cloudprovider.Route) error {
routeName := truncateClusterName(clusterName) + "-" + nameHint
instanceName := canonicalizeInstanceName(route.TargetInstance)
targetInstance, err := gce.getInstanceByName(route.TargetInstance)
if err != nil {
return err
}
insertOp, err := gce.service.Routes.Insert(gce.projectID, &compute.Route{
Name: routeName,
DestRange: route.DestinationCIDR,
NextHopInstance: fmt.Sprintf("zones/%s/instances/%s", gce.zone, instanceName),
NextHopInstance: fmt.Sprintf("zones/%s/instances/%s", targetInstance.Zone, targetInstance.Name),
Network: gce.networkURL,
Priority: 1000,
Description: k8sNodeRouteTag,
@ -1548,40 +1610,46 @@ func (gce *GCECloud) DeleteRoute(clusterName string, route *cloudprovider.Route)
}
func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
region, err := getGceRegion(gce.zone)
if err != nil {
return cloudprovider.Zone{}, err
}
return cloudprovider.Zone{
FailureDomain: gce.zone,
Region: region,
FailureDomain: gce.localZone,
Region: gce.region,
}, nil
}
func (gce *GCECloud) CreateDisk(name string, sizeGb int64) error {
// Create a new Persistent Disk, with the specified name & size, in the specified zone.
func (gce *GCECloud) CreateDisk(name string, zone string, sizeGb int64) error {
diskToCreate := &compute.Disk{
Name: name,
SizeGb: sizeGb,
}
createOp, err := gce.service.Disks.Insert(gce.projectID, gce.zone, diskToCreate).Do()
createOp, err := gce.service.Disks.Insert(gce.projectID, zone, diskToCreate).Do()
if err != nil {
return err
}
return gce.waitForZoneOp(createOp)
return gce.waitForZoneOp(createOp, zone)
}
func (gce *GCECloud) DeleteDisk(diskToDelete string) error {
deleteOp, err := gce.service.Disks.Delete(gce.projectID, gce.zone, diskToDelete).Do()
disk, err := gce.getDiskByNameUnknownZone(diskToDelete)
if err != nil {
return err
}
return gce.waitForZoneOp(deleteOp)
deleteOp, err := gce.service.Disks.Delete(gce.projectID, disk.Zone, disk.Name).Do()
if err != nil {
return err
}
return gce.waitForZoneOp(deleteOp, disk.Zone)
}
func (gce *GCECloud) AttachDisk(diskName, instanceID string, readOnly bool) error {
disk, err := gce.getDisk(diskName)
instance, err := gce.getInstanceByName(instanceID)
if err != nil {
return fmt.Errorf("error getting instance %q", instanceID)
}
disk, err := gce.getDiskByName(diskName, instance.Zone)
if err != nil {
return err
}
@ -1591,25 +1659,30 @@ func (gce *GCECloud) AttachDisk(diskName, instanceID string, readOnly bool) erro
}
attachedDisk := gce.convertDiskToAttachedDisk(disk, readWrite)
attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, gce.zone, instanceID, attachedDisk).Do()
attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, disk.Zone, instanceID, attachedDisk).Do()
if err != nil {
return err
}
return gce.waitForZoneOp(attachOp)
return gce.waitForZoneOp(attachOp, disk.Zone)
}
func (gce *GCECloud) DetachDisk(devicePath, instanceID string) error {
detachOp, err := gce.service.Instances.DetachDisk(gce.projectID, gce.zone, instanceID, devicePath).Do()
inst, err := gce.getInstanceByName(instanceID)
if err != nil {
return fmt.Errorf("error getting instance %q", instanceID)
}
detachOp, err := gce.service.Instances.DetachDisk(gce.projectID, inst.Zone, inst.Name, devicePath).Do()
if err != nil {
return err
}
return gce.waitForZoneOp(detachOp)
return gce.waitForZoneOp(detachOp, inst.Zone)
}
func (gce *GCECloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, instanceID).Do()
instance, err := gce.getInstanceByName(instanceID)
if err != nil {
return false, err
}
@ -1624,15 +1697,62 @@ func (gce *GCECloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
return false, nil
}
func (gce *GCECloud) getDisk(diskName string) (*compute.Disk, error) {
return gce.service.Disks.Get(gce.projectID, gce.zone, diskName).Do()
// Returns a gceDisk for the disk, if it is found in the specified zone.
// If not found, returns (nil, nil)
func (gce *GCECloud) findDiskByName(diskName string, zone string) (*gceDisk, error) {
disk, err := gce.service.Disks.Get(gce.projectID, zone, diskName).Do()
if err == nil {
d := &gceDisk{
Zone: lastComponent(disk.Zone),
Name: disk.Name,
Kind: disk.Kind,
}
return d, nil
}
if !isHTTPErrorCode(err, http.StatusNotFound) {
return nil, err
}
return nil, nil
}
// getGceRegion returns region of the gce zone. Zone names
// Like findDiskByName, but returns an error if the disk is not found
func (gce *GCECloud) getDiskByName(diskName string, zone string) (*gceDisk, error) {
disk, err := gce.findDiskByName(diskName, zone)
if disk == nil && err == nil {
return nil, fmt.Errorf("GCE persistent disk not found: %q", diskName)
}
return disk, err
}
// Scans all managed zones to return the GCE PD
// Prefer getDiskByName, if the zone can be established
func (gce *GCECloud) getDiskByNameUnknownZone(diskName string) (*gceDisk, error) {
// Note: this is the gotcha right now with GCE PD support:
// disk names are not unique per-region.
// (I can create two volumes with name "myvol" in e.g. us-central1-b & us-central1-f)
// For now, this is simply undefined behvaiour.
//
// In future, we will have to require users to qualify their disk
// "us-central1-a/mydisk". We could do this for them as part of
// admission control, but that might be a little weird (values changing
// on create)
for _, zone := range gce.managedZones {
disk, err := gce.findDiskByName(diskName, zone)
if err != nil {
return nil, err
}
if disk != nil {
return disk, nil
}
}
return nil, fmt.Errorf("GCE persistent disk not found: %q", diskName)
}
// GetGCERegion returns region of the gce zone. Zone names
// are of the form: ${region-name}-${ix}.
// For example "us-central1-b" has a region of "us-central1".
// So we look for the last '-' and trim to just before that.
func getGceRegion(zone string) (string, error) {
func GetGCERegion(zone string) (string, error) {
ix := strings.LastIndex(zone, "-")
if ix == -1 {
return "", fmt.Errorf("unexpected zone: %s", zone)
@ -1641,18 +1761,18 @@ func getGceRegion(zone string) (string, error) {
}
// Converts a Disk resource to an AttachedDisk resource.
func (gce *GCECloud) convertDiskToAttachedDisk(disk *compute.Disk, readWrite string) *compute.AttachedDisk {
func (gce *GCECloud) convertDiskToAttachedDisk(disk *gceDisk, readWrite string) *compute.AttachedDisk {
return &compute.AttachedDisk{
DeviceName: disk.Name,
Kind: disk.Kind,
Mode: readWrite,
Source: "https://" + path.Join("www.googleapis.com/compute/v1/projects/", gce.projectID, "zones", gce.zone, "disks", disk.Name),
Source: "https://" + path.Join("www.googleapis.com/compute/v1/projects/", gce.projectID, "zones", disk.Zone, "disks", disk.Name),
Type: "PERSISTENT",
}
}
func (gce *GCECloud) ListClusters() ([]string, error) {
list, err := gce.containerService.Projects.Zones.Clusters.List(gce.projectID, gce.zone).Do()
func (gce *GCECloud) listClustersInZone(zone string) ([]string, error) {
list, err := gce.containerService.Projects.Zones.Clusters.List(gce.projectID, zone).Do()
if err != nil {
return nil, err
}
@ -1663,6 +1783,129 @@ func (gce *GCECloud) ListClusters() ([]string, error) {
return result, nil
}
func (gce *GCECloud) ListClusters() ([]string, error) {
allClusters := []string{}
for _, zone := range gce.managedZones {
clusters, err := gce.listClustersInZone(zone)
if err != nil {
return nil, err
}
// TODO: Scoping? Do we need to qualify the cluster name?
allClusters = append(allClusters, clusters...)
}
return allClusters, nil
}
func (gce *GCECloud) Master(clusterName string) (string, error) {
return "k8s-" + clusterName + "-master.internal", nil
}
type gceInstance struct {
Zone string
Name string
ID uint64
Disks []*compute.AttachedDisk
}
type gceDisk struct {
Zone string
Name string
Kind string
}
func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error) {
instances := make(map[string]*gceInstance)
for _, name := range names {
name = canonicalizeInstanceName(name)
instances[name] = nil
}
for _, zone := range gce.managedZones {
var remaining []string
for name, instance := range instances {
if instance == nil {
remaining = append(remaining, name)
}
}
if len(remaining) == 0 {
break
}
listCall := gce.service.Instances.List(gce.projectID, zone)
// Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(remaining, "|") + ")")
listCall = listCall.Fields("items(name,id,disks)")
res, err := listCall.Do()
if err != nil {
return nil, err
}
for _, i := range res.Items {
name := i.Name
instance := &gceInstance{
Zone: zone,
Name: name,
ID: i.Id,
Disks: i.Disks,
}
instances[name] = instance
}
}
instanceArray := make([]*gceInstance, len(names))
for i, name := range names {
instance := instances[name]
if instance == nil {
return nil, fmt.Errorf("failed to retrieve instance: %q", name)
}
instanceArray[i] = instances[name]
}
return instanceArray, nil
}
func (gce *GCECloud) getInstanceByName(name string) (*gceInstance, error) {
// Avoid changing behaviour when not managing multiple zones
if len(gce.managedZones) == 1 {
name = canonicalizeInstanceName(name)
zone := gce.managedZones[0]
res, err := gce.service.Instances.Get(gce.projectID, zone, name).Do()
if err != nil {
if !isHTTPErrorCode(err, http.StatusNotFound) {
return nil, err
}
}
return &gceInstance{
Zone: lastComponent(res.Zone),
Name: res.Name,
ID: res.Id,
Disks: res.Disks,
}, nil
}
instances, err := gce.getInstancesByNames([]string{name})
if err != nil {
return nil, err
}
if len(instances) != 1 || instances[0] == nil {
return nil, fmt.Errorf("unexpected return value from getInstancesByNames: %v", instances)
}
return instances[0], nil
}
// Returns the last component of a URL, i.e. anything after the last slash
// If there is no slash, returns the whole string
func lastComponent(s string) string {
lastSlash := strings.LastIndex(s, "/")
if lastSlash != -1 {
s = s[lastSlash+1:]
}
return s
}

View File

@ -22,8 +22,17 @@ import (
)
func TestGetRegion(t *testing.T) {
zoneName := "us-central1-b"
regionName, err := GetGCERegion(zoneName)
if err != nil {
t.Fatalf("unexpected error from GetGCERegion: %v", err)
}
if regionName != "us-central1" {
t.Errorf("Unexpected region from GetGCERegion: %s", regionName)
}
gce := &GCECloud{
zone: "us-central1-b",
localZone: zoneName,
region: regionName,
}
zones, ok := gce.Zones()
if !ok {
@ -91,7 +100,11 @@ func TestComparingHostURLs(t *testing.T) {
for _, test := range tests {
link1 := hostURLToComparablePath(test.host1)
link2 := makeComparableHostPath(test.zone, test.name)
testInstance := &gceInstance{
Name: canonicalizeInstanceName(test.name),
Zone: test.zone,
}
link2 := testInstance.makeComparableHostPath()
if test.expectEqual && link1 != link2 {
t.Errorf("expected link1 and link2 to be equal, got %s and %s", link1, link2)
} else if !test.expectEqual && link1 == link2 {

View File

@ -137,7 +137,16 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (volum
requestBytes := c.options.Capacity.Value()
// GCE works with gigabytes, convert to GiB with rounding up
requestGB := volume.RoundUpSize(requestBytes, 1024*1024*1024)
err = cloud.CreateDisk(name, int64(requestGB))
// The disk will be created in the zone in which this code is currently running
// TODO: We should support auto-provisioning volumes in multiple/specified zones
zone, err := cloud.GetZone()
if err != nil {
glog.V(2).Infof("error getting zone information from GCE: %v", err)
return "", 0, err
}
err = cloud.CreateDisk(name, zone.FailureDomain, int64(requestGB))
if err != nil {
glog.V(2).Infof("Error creating GCE PD volume: %v", err)
return "", 0, err

View File

@ -119,7 +119,13 @@ func TestE2E(t *testing.T) {
Logf("Using service account %q as token source.", cloudConfig.ServiceAccount)
tokenSource = google.ComputeTokenSource(cloudConfig.ServiceAccount)
}
cloudConfig.Provider, err = gcecloud.CreateGCECloud(testContext.CloudConfig.ProjectID, testContext.CloudConfig.Zone, "" /* networkUrl */, tokenSource, false /* useMetadataServer */)
zone := testContext.CloudConfig.Zone
region, err := gcecloud.GetGCERegion(zone)
if err != nil {
glog.Fatalf("error parsing GCE region from zone %q: %v", zone, err)
}
managedZones := []string{zone} // Only single-zone for now
cloudConfig.Provider, err = gcecloud.CreateGCECloud(testContext.CloudConfig.ProjectID, region, zone, managedZones, "" /* networkUrl */, tokenSource, false /* useMetadataServer */)
if err != nil {
glog.Fatal("Error building GCE provider: ", err)
}

View File

@ -314,7 +314,7 @@ func createPD() (string, error) {
return "", err
}
err = gceCloud.CreateDisk(pdName, 10 /* sizeGb */)
err = gceCloud.CreateDisk(pdName, testContext.CloudConfig.Zone, 10 /* sizeGb */)
if err != nil {
return "", err
}