From 9063526dfb50918f20b3cde941aba3ac45966afc Mon Sep 17 00:00:00 2001 From: Nick Sardo Date: Thu, 25 May 2017 12:28:47 -0700 Subject: [PATCH] GCE: Refactor firewalls/backendservices api; other small changes --- pkg/cloudprovider/providers/gce/gce.go | 55 ++++++++- .../providers/gce/gce_backendservice.go | 93 +++++++++++--- .../providers/gce/gce_firewall.go | 83 +++---------- .../providers/gce/gce_healthchecks.go | 31 +++-- .../providers/gce/gce_instancegroup.go | 60 ++------- .../providers/gce/gce_instances.go | 114 ++++++++++++++++++ .../providers/gce/gce_loadbalancer.go | 84 +------------ pkg/cloudprovider/providers/gce/gce_test.go | 10 -- test/e2e/framework/ingress_utils.go | 4 +- 9 files changed, 297 insertions(+), 237 deletions(-) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index a620afcb32b..5cb2d7bf447 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -29,6 +29,7 @@ import ( "gopkg.in/gcfg.v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/cloudprovider" @@ -77,19 +78,25 @@ const ( // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. type GCECloud struct { + ClusterID ClusterID + service *compute.Service serviceBeta *computebeta.Service containerService *container.Service clientBuilder controller.ControllerClientBuilder - ClusterID ClusterID projectID string region string localZone string // The zone in which we are running managedZones []string // List of zones we are spanning (for multi-AZ clusters, primarily when running on master) networkURL string subnetworkURL string - nodeTags []string // List of tags to use on firewall rules for load balancers - nodeInstancePrefix string // If non-"", an advisory prefix for all nodes in the cluster + networkProjectID string + onXPN bool + nodeTags []string // List of tags to use on firewall rules for load balancers + lastComputedNodeTags []string // List of node tags calculated in GetHostTags() + lastKnownNodeNames sets.String // List of hostnames used to calculate lastComputedHostTags in GetHostTags(names) + computeNodeTagLock sync.Mutex // Lock for computing and setting node tags + nodeInstancePrefix string // If non-"", an advisory prefix for all nodes in the cluster useMetadataServer bool operationPollRateLimiter flowcontrol.RateLimiter manager ServiceManager @@ -243,6 +250,12 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo networkURL = gceNetworkURL(projectID, networkName) } + networkProjectID, err := getProjectIDInURL(networkURL) + if err != nil { + return nil, err + } + onXPN := networkProjectID != projectID + if len(managedZones) == 0 { managedZones, err = getZonesForRegion(service, projectID, region) if err != nil { @@ -260,6 +273,8 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo serviceBeta: serviceBeta, containerService: containerService, projectID: projectID, + networkProjectID: networkProjectID, + onXPN: onXPN, region: region, localZone: zone, managedZones: managedZones, @@ -311,6 +326,26 @@ func (gce *GCECloud) ProviderName() string { return ProviderName } +// Region returns the region +func (gce *GCECloud) Region() string { + return gce.region +} + +// OnXPN returns true if the cluster is running on a cross project network (XPN) +func (gce *GCECloud) OnXPN() bool { + return gce.onXPN +} + +// NetworkURL returns the network url +func (gce *GCECloud) NetworkURL() string { + return gce.networkURL +} + +// SubnetworkURL returns the subnetwork url +func (gce *GCECloud) SubnetworkURL() string { + return gce.subnetworkURL +} + // Known-useless DNS search path. var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`) @@ -336,6 +371,20 @@ func gceSubnetworkURL(project, region, subnetwork string) string { return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", project, region, subnetwork) } +// getProjectIDInURL parses typical full resource URLS and shorter URLS +// https://www.googleapis.com/compute/v1/projects/myproject/global/networks/mycustom +// projects/myproject/global/networks/mycustom +// All return "myproject" +func getProjectIDInURL(urlStr string) (string, error) { + fields := strings.Split(urlStr, "/") + for i, v := range fields { + if v == "projects" && i < len(fields)-1 { + return fields[i+1], nil + } + } + return "", fmt.Errorf("could not find project field in url: %v", urlStr) +} + func getNetworkNameViaMetadata() (string, error) { result, err := metadata.Get("instance/network-interfaces/0/network") if err != nil { diff --git a/pkg/cloudprovider/providers/gce/gce_backendservice.go b/pkg/cloudprovider/providers/gce/gce_backendservice.go index 02b20e8ede9..1d79dfde0d3 100644 --- a/pkg/cloudprovider/providers/gce/gce_backendservice.go +++ b/pkg/cloudprovider/providers/gce/gce_backendservice.go @@ -30,15 +30,15 @@ func newBackendServiceMetricContext(request string) *metricContext { } } -// GetBackendService retrieves a backend by name. -func (gce *GCECloud) GetBackendService(name string) (*compute.BackendService, error) { +// GetGlobalBackendService retrieves a backend by name. +func (gce *GCECloud) GetGlobalBackendService(name string) (*compute.BackendService, error) { mc := newBackendServiceMetricContext("get") v, err := gce.service.BackendServices.Get(gce.projectID, name).Do() return v, mc.Observe(err) } -// UpdateBackendService applies the given BackendService as an update to an existing service. -func (gce *GCECloud) UpdateBackendService(bg *compute.BackendService) error { +// UpdateGlobalBackendService applies the given BackendService as an update to an existing service. +func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) error { mc := newBackendServiceMetricContext("update") op, err := gce.service.BackendServices.Update(gce.projectID, bg.Name, bg).Do() if err != nil { @@ -48,8 +48,8 @@ func (gce *GCECloud) UpdateBackendService(bg *compute.BackendService) error { return gce.waitForGlobalOp(op, mc) } -// DeleteBackendService deletes the given BackendService by name. -func (gce *GCECloud) DeleteBackendService(name string) error { +// DeleteGlobalBackendService deletes the given BackendService by name. +func (gce *GCECloud) DeleteGlobalBackendService(name string) error { mc := newBackendServiceMetricContext("delete") op, err := gce.service.BackendServices.Delete(gce.projectID, name).Do() if err != nil { @@ -62,8 +62,8 @@ func (gce *GCECloud) DeleteBackendService(name string) error { return gce.waitForGlobalOp(op, mc) } -// CreateBackendService creates the given BackendService. -func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error { +// CreateGlobalBackendService creates the given BackendService. +func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) error { mc := newBackendServiceMetricContext("create") op, err := gce.service.BackendServices.Insert(gce.projectID, bg).Do() if err != nil { @@ -73,16 +73,81 @@ func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error { return gce.waitForGlobalOp(op, mc) } -// ListBackendServices lists all backend services in the project. -func (gce *GCECloud) ListBackendServices() (*compute.BackendServiceList, error) { +// ListGlobalBackendServices lists all backend services in the project. +func (gce *GCECloud) ListGlobalBackendServices() (*compute.BackendServiceList, error) { + mc := newBackendServiceMetricContext("list") // TODO: use PageToken to list all not just the first 500 - return gce.service.BackendServices.List(gce.projectID).Do() + v, err := gce.service.BackendServices.List(gce.projectID).Do() + return v, mc.Observe(err) } -// GetHealth returns the health of the BackendService identified by the given +// GetGlobalBackendServiceHealth returns the health of the BackendService identified by the given // name, in the given instanceGroup. The instanceGroupLink is the fully // qualified self link of an instance group. -func (gce *GCECloud) GetHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) { +func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) { + mc := newBackendServiceMetricContext("get_health") groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink} - return gce.service.BackendServices.GetHealth(gce.projectID, name, groupRef).Do() + v, err := gce.service.BackendServices.GetHealth(gce.projectID, name, groupRef).Do() + return v, mc.Observe(err) +} + +// GetRegionBackendService retrieves a backend by name. +func (gce *GCECloud) GetRegionBackendService(name, region string) (*compute.BackendService, error) { + mc := newBackendServiceMetricContext("get") + v, err := gce.service.RegionBackendServices.Get(gce.projectID, region, name).Do() + return v, mc.Observe(err) +} + +// UpdateRegionBackendService applies the given BackendService as an update to an existing service. +func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService) error { + mc := newBackendServiceMetricContext("update") + op, err := gce.service.RegionBackendServices.Update(gce.projectID, bg.Region, bg.Name, bg).Do() + if err != nil { + return mc.Observe(err) + } + + return gce.waitForRegionOp(op, bg.Region, mc) +} + +// DeleteRegionBackendService deletes the given BackendService by name. +func (gce *GCECloud) DeleteRegionBackendService(name, region string) error { + mc := newBackendServiceMetricContext("delete") + op, err := gce.service.RegionBackendServices.Delete(gce.projectID, region, name).Do() + if err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + return nil + } + return mc.Observe(err) + } + + return gce.waitForRegionOp(op, region, mc) +} + +// CreateRegionBackendService creates the given BackendService. +func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService) error { + mc := newBackendServiceMetricContext("create") + op, err := gce.service.RegionBackendServices.Insert(gce.projectID, bg.Region, bg).Do() + if err != nil { + return mc.Observe(err) + } + + return gce.waitForRegionOp(op, bg.Region, mc) +} + +// ListRegionBackendServices lists all backend services in the project. +func (gce *GCECloud) ListRegionBackendServices(region string) (*compute.BackendServiceList, error) { + mc := newBackendServiceMetricContext("list") + // TODO: use PageToken to list all not just the first 500 + v, err := gce.service.RegionBackendServices.List(gce.projectID, region).Do() + return v, mc.Observe(err) +} + +// GetRegionalBackendServiceHealth returns the health of the BackendService identified by the given +// name, in the given instanceGroup. The instanceGroupLink is the fully +// qualified self link of an instance group. +func (gce *GCECloud) GetRegionalBackendServiceHealth(name, region string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) { + mc := newBackendServiceMetricContext("get_health") + groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink} + v, err := gce.service.RegionBackendServices.GetHealth(gce.projectID, region, name, groupRef).Do() + return v, mc.Observe(err) } diff --git a/pkg/cloudprovider/providers/gce/gce_firewall.go b/pkg/cloudprovider/providers/gce/gce_firewall.go index 1826f20eb67..668f2e05522 100644 --- a/pkg/cloudprovider/providers/gce/gce_firewall.go +++ b/pkg/cloudprovider/providers/gce/gce_firewall.go @@ -19,96 +19,51 @@ package gce import ( "time" - "k8s.io/kubernetes/pkg/api/v1" - netsets "k8s.io/kubernetes/pkg/util/net/sets" - compute "google.golang.org/api/compute/v1" ) -func newFirewallMetricContext(request string, region string) *metricContext { +func newFirewallMetricContext(request string) *metricContext { return &metricContext{ start: time.Now(), - attributes: []string{"firewall_" + request, region, unusedMetricLabel}, + attributes: []string{"firewall_" + request, unusedMetricLabel, unusedMetricLabel}, } } // GetFirewall returns the Firewall by name. func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) { - mc := newFirewallMetricContext("get", "") + mc := newFirewallMetricContext("get") v, err := gce.service.Firewalls.Get(gce.projectID, name).Do() return v, mc.Observe(err) } -// CreateFirewall creates the given firewall rule. -func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges netsets.IPNet, ports []int64, hostNames []string) error { - region, err := GetGCERegion(gce.localZone) +// CreateFirewall creates the passed firewall +func (gce *GCECloud) CreateFirewall(f *compute.Firewall) error { + mc := newFirewallMetricContext("create") + op, err := gce.service.Firewalls.Insert(gce.projectID, f).Do() if err != nil { - return err + return mc.Observe(err) } - mc := newFirewallMetricContext("create", region) - // TODO: This completely breaks modularity in the cloudprovider but - // the methods shared with the TCPLoadBalancer take v1.ServicePorts. - svcPorts := []v1.ServicePort{} - // TODO: Currently the only consumer of this method is the GCE L7 - // loadbalancer controller, which never needs a protocol other than - // TCP. We should pipe through a mapping of port:protocol and - // default to TCP if UDP ports are required. This means the method - // signature will change forcing downstream clients to refactor - // interfaces. - for _, p := range ports { - svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP}) - } - - hosts, err := gce.getInstancesByNames(hostNames) - if err != nil { - mc.Observe(err) - return err - } - - return mc.Observe(gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts)) + return gce.waitForGlobalOp(op, mc) } // DeleteFirewall deletes the given firewall rule. func (gce *GCECloud) DeleteFirewall(name string) error { - region, err := GetGCERegion(gce.localZone) + mc := newFirewallMetricContext("delete") + op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do() if err != nil { - return err + return mc.Observe(err) } - - mc := newFirewallMetricContext("delete", region) - - return mc.Observe(gce.deleteFirewall(name, region)) + return gce.waitForGlobalOp(op, mc) } -// UpdateFirewall applies the given firewall rule as an update to an -// existing firewall rule with the same name. -func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges netsets.IPNet, ports []int64, hostNames []string) error { - - region, err := GetGCERegion(gce.localZone) +// UpdateFirewall applies the given firewall as an update to an existing service. +func (gce *GCECloud) UpdateFirewall(f *compute.Firewall) error { + mc := newFirewallMetricContext("update") + op, err := gce.service.Firewalls.Update(gce.projectID, f.Name, f).Do() if err != nil { - return err + return mc.Observe(err) } - mc := newFirewallMetricContext("update", region) - // TODO: This completely breaks modularity in the cloudprovider but - // the methods shared with the TCPLoadBalancer take v1.ServicePorts. - svcPorts := []v1.ServicePort{} - // TODO: Currently the only consumer of this method is the GCE L7 - // loadbalancer controller, which never needs a protocol other than - // TCP. We should pipe through a mapping of port:protocol and - // default to TCP if UDP ports are required. This means the method - // signature will change, forcing downstream clients to refactor - // interfaces. - for _, p := range ports { - svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP}) - } - - hosts, err := gce.getInstancesByNames(hostNames) - if err != nil { - mc.Observe(err) - return err - } - - return mc.Observe(gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts)) + return gce.waitForGlobalOp(op, mc) } diff --git a/pkg/cloudprovider/providers/gce/gce_healthchecks.go b/pkg/cloudprovider/providers/gce/gce_healthchecks.go index 6b0f6795c17..7bd060b54a7 100644 --- a/pkg/cloudprovider/providers/gce/gce_healthchecks.go +++ b/pkg/cloudprovider/providers/gce/gce_healthchecks.go @@ -29,11 +29,22 @@ import ( ) const ( - minNodesHealthCheckVersion = "1.7.0" - nodesHealthCheckPath = "/healthz" - lbNodesHealthCheckPort = ports.ProxyHealthzPort + nodesHealthCheckPath = "/healthz" + lbNodesHealthCheckPort = ports.ProxyHealthzPort ) +var ( + minNodesHealthCheckVersion *utilversion.Version +) + +func init() { + if v, err := utilversion.ParseGeneric("1.7.0"); err != nil { + panic(err) + } else { + minNodesHealthCheckVersion = v + } +} + func newHealthcheckMetricContext(request string) *metricContext { return &metricContext{ start: time.Now(), @@ -212,28 +223,22 @@ func makeNodesHealthCheckName(clusterID string) string { // MakeHealthCheckFirewallName returns the firewall name used by the GCE load // balancers (l4) for performing health checks. func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string { - // TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc. - fwName := "k8s-" + hcName + "-http-hc" if isNodesHealthCheck { - fwName = makeNodesHealthCheckName(clusterID) + "-http-hc" + // TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc. + return makeNodesHealthCheckName(clusterID) + "-http-hc" } - return fwName + return "k8s-" + hcName + "-http-hc" } // isAtLeastMinNodesHealthCheckVersion checks if a version is higher than // `minNodesHealthCheckVersion`. func isAtLeastMinNodesHealthCheckVersion(vstring string) bool { - minVersion, err := utilversion.ParseGeneric(minNodesHealthCheckVersion) - if err != nil { - glog.Errorf("MinNodesHealthCheckVersion (%s) is not a valid version string: %v", minNodesHealthCheckVersion, err) - return false - } version, err := utilversion.ParseGeneric(vstring) if err != nil { glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err) return false } - return version.AtLeast(minVersion) + return version.AtLeast(minNodesHealthCheckVersion) } // supportsNodesHealthCheck returns false if anyone of the nodes has version diff --git a/pkg/cloudprovider/providers/gce/gce_instancegroup.go b/pkg/cloudprovider/providers/gce/gce_instancegroup.go index e5ee03be020..c25aeb1af08 100644 --- a/pkg/cloudprovider/providers/gce/gce_instancegroup.go +++ b/pkg/cloudprovider/providers/gce/gce_instancegroup.go @@ -17,12 +17,8 @@ limitations under the License. package gce import ( - "fmt" - "net/http" - "strings" "time" - "github.com/golang/glog" compute "google.golang.org/api/compute/v1" ) @@ -37,7 +33,6 @@ func newInstanceGroupMetricContext(request string, zone string) *metricContext { // instances. It is the callers responsibility to add named ports. func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) { mc := newInstanceGroupMetricContext("create", zone) - op, err := gce.service.InstanceGroups.Insert( gce.projectID, zone, &compute.InstanceGroup{Name: name}).Do() if err != nil { @@ -55,12 +50,10 @@ func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.Ins // DeleteInstanceGroup deletes an instance group. func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error { mc := newInstanceGroupMetricContext("delete", zone) - op, err := gce.service.InstanceGroups.Delete( gce.projectID, zone, name).Do() if err != nil { - mc.Observe(err) - return err + return mc.Observe(err) } return gce.waitForZoneOp(op, zone, mc) @@ -103,10 +96,8 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta &compute.InstanceGroupsAddInstancesRequest{ Instances: instances, }).Do() - if err != nil { - mc.Observe(err) - return err + return mc.Observe(err) } return gce.waitForZoneOp(op, zone, mc) @@ -130,55 +121,24 @@ func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, &compute.InstanceGroupsRemoveInstancesRequest{ Instances: instances, }).Do() - if err != nil { - if isHTTPErrorCode(err, http.StatusNotFound) { - mc.Observe(nil) - return nil - } - - mc.Observe(err) - return err + return mc.Observe(err) } return gce.waitForZoneOp(op, zone, mc) } -// AddPortToInstanceGroup adds a port to the given instance group. -func (gce *GCECloud) AddPortToInstanceGroup(ig *compute.InstanceGroup, port int64) (*compute.NamedPort, error) { - mc := newInstanceGroupMetricContext("add_port", ig.Zone) - for _, np := range ig.NamedPorts { - if np.Port == port { - glog.V(3).Infof("Instance group %v already has named port %+v", ig.Name, np) - return np, nil - } - } - - glog.Infof("Adding port %v to instance group %v with %d ports", port, ig.Name, len(ig.NamedPorts)) - namedPort := compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port} - ig.NamedPorts = append(ig.NamedPorts, &namedPort) - - // setNamedPorts is a zonal endpoint, meaning we invoke it by re-creating a URL like: - // {project}/zones/{zone}/instanceGroups/{instanceGroup}/setNamedPorts, so the "zone" - // parameter given to SetNamedPorts must not be the entire zone URL. - zoneURLParts := strings.Split(ig.Zone, "/") - zone := zoneURLParts[len(zoneURLParts)-1] - +// SetNamedPortsOfInstanceGroup sets the list of named ports on a given instance group +func (gce *GCECloud) SetNamedPortsOfInstanceGroup(igName, zone string, namedPorts []*compute.NamedPort) error { + mc := newInstanceGroupMetricContext("set_namedports", zone) op, err := gce.service.InstanceGroups.SetNamedPorts( - gce.projectID, zone, ig.Name, - &compute.InstanceGroupsSetNamedPortsRequest{ - NamedPorts: ig.NamedPorts}).Do() - + gce.projectID, zone, igName, + &compute.InstanceGroupsSetNamedPortsRequest{NamedPorts: namedPorts}).Do() if err != nil { - mc.Observe(err) - return nil, err + return mc.Observe(err) } - if err = gce.waitForZoneOp(op, zone, mc); err != nil { - return nil, err - } - - return &namedPort, nil + return gce.waitForZoneOp(op, zone, mc) } // GetInstanceGroup returns an instance group by name. diff --git a/pkg/cloudprovider/providers/gce/gce_instances.go b/pkg/cloudprovider/providers/gce/gce_instances.go index 6446f26ff18..33437a34628 100644 --- a/pkg/cloudprovider/providers/gce/gce_instances.go +++ b/pkg/cloudprovider/providers/gce/gce_instances.go @@ -443,3 +443,117 @@ func (gce *GCECloud) isCurrentInstance(instanceID string) bool { return currentInstanceID == canonicalizeInstanceName(instanceID) } + +// ComputeHostTags grabs all tags from all instances being added to the pool. +// * The longest tag that is a prefix of the instance name is used +// * If any instance has no matching prefix tag, return error +// Invoking this method to get host tags is risky since it depends on the format +// of the host names in the cluster. Only use it as a fallback if gce.nodeTags +// is unspecified +func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) { + // TODO: We could store the tags in gceInstance, so we could have already fetched it + hostNamesByZone := make(map[string]map[string]bool) // map of zones -> map of names -> bool (for easy lookup) + nodeInstancePrefix := gce.nodeInstancePrefix + for _, host := range hosts { + if !strings.HasPrefix(host.Name, gce.nodeInstancePrefix) { + glog.Warningf("instance '%s' does not conform to prefix '%s', ignoring filter", host, gce.nodeInstancePrefix) + nodeInstancePrefix = "" + } + + z, ok := hostNamesByZone[host.Zone] + if !ok { + z = make(map[string]bool) + hostNamesByZone[host.Zone] = z + } + z[host.Name] = true + } + + tags := sets.NewString() + + for zone, hostNames := range hostNamesByZone { + pageToken := "" + page := 0 + for ; page == 0 || (pageToken != "" && page < maxPages); page++ { + listCall := gce.service.Instances.List(gce.projectID, zone) + + if nodeInstancePrefix != "" { + // Add the filter for hosts + listCall = listCall.Filter("name eq " + nodeInstancePrefix + ".*") + } + + // Add the fields we want + // TODO(zmerlynn): Internal bug 29524655 + // listCall = listCall.Fields("items(name,tags)") + + if pageToken != "" { + listCall = listCall.PageToken(pageToken) + } + + res, err := listCall.Do() + if err != nil { + return nil, err + } + pageToken = res.NextPageToken + for _, instance := range res.Items { + if !hostNames[instance.Name] { + continue + } + + longest_tag := "" + for _, tag := range instance.Tags.Items { + if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) { + longest_tag = tag + } + } + if len(longest_tag) > 0 { + tags.Insert(longest_tag) + } else { + return nil, fmt.Errorf("Could not find any tag that is a prefix of instance name for instance %s", instance.Name) + } + } + } + if page >= maxPages { + glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages) + } + } + if len(tags) == 0 { + return nil, fmt.Errorf("No instances found") + } + return tags.List(), nil +} + +// GetNodeTags will first try returning the list of tags specified in GCE cloud Configuration. +// If they weren't provided, it'll compute the host tags with the given hostnames. If the list +// of hostnames has not changed, a cached set of nodetags are returned. +func (gce *GCECloud) GetNodeTags(nodeNames []string) ([]string, error) { + // If nodeTags were specified through configuration, use them + if len(gce.nodeTags) > 0 { + return gce.nodeTags, nil + } + + gce.computeNodeTagLock.Lock() + defer gce.computeNodeTagLock.Unlock() + + // Early return if hosts have not changed + hosts := sets.NewString(nodeNames...) + if hosts.Equal(gce.lastKnownNodeNames) { + return gce.lastComputedNodeTags, nil + } + + // Get GCE instance data by hostname + instances, err := gce.getInstancesByNames(nodeNames) + if err != nil { + return nil, err + } + + // Determine list of host tags + tags, err := gce.computeHostTags(instances) + if err != nil { + return nil, err + } + + // Save the list of tags + gce.lastKnownNodeNames = hosts + gce.lastComputedNodeTags = tags + return tags, nil +} diff --git a/pkg/cloudprovider/providers/gce/gce_loadbalancer.go b/pkg/cloudprovider/providers/gce/gce_loadbalancer.go index dd6a09ef4d5..b3cf4e7792c 100644 --- a/pkg/cloudprovider/providers/gce/gce_loadbalancer.go +++ b/pkg/cloudprovider/providers/gce/gce_loadbalancer.go @@ -1010,7 +1010,7 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s } func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error { - mc := newFirewallMetricContext("create", region) + mc := newFirewallMetricContext("create") firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) if err != nil { return mc.Observe(err) @@ -1029,7 +1029,7 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges nets } func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error { - mc := newFirewallMetricContext("update", region) + mc := newFirewallMetricContext("update") firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) if err != nil { return mc.Observe(err) @@ -1083,84 +1083,6 @@ func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges nets return firewall, nil } -// ComputeHostTags grabs all tags from all instances being added to the pool. -// * The longest tag that is a prefix of the instance name is used -// * If any instance has no matching prefix tag, return error -// Invoking this method to get host tags is risky since it depends on the format -// of the host names in the cluster. Only use it as a fallback if gce.nodeTags -// is unspecified -func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) { - // TODO: We could store the tags in gceInstance, so we could have already fetched it - hostNamesByZone := make(map[string]map[string]bool) // map of zones -> map of names -> bool (for easy lookup) - nodeInstancePrefix := gce.nodeInstancePrefix - for _, host := range hosts { - if !strings.HasPrefix(host.Name, gce.nodeInstancePrefix) { - glog.Warningf("instance '%s' does not conform to prefix '%s', ignoring filter", host, gce.nodeInstancePrefix) - nodeInstancePrefix = "" - } - - z, ok := hostNamesByZone[host.Zone] - if !ok { - z = make(map[string]bool) - hostNamesByZone[host.Zone] = z - } - z[host.Name] = true - } - - tags := sets.NewString() - - for zone, hostNames := range hostNamesByZone { - pageToken := "" - page := 0 - for ; page == 0 || (pageToken != "" && page < maxPages); page++ { - listCall := gce.service.Instances.List(gce.projectID, zone) - - if nodeInstancePrefix != "" { - // Add the filter for hosts - listCall = listCall.Filter("name eq " + nodeInstancePrefix + ".*") - } - - // Add the fields we want - // TODO(zmerlynn): Internal bug 29524655 - // listCall = listCall.Fields("items(name,tags)") - - if pageToken != "" { - listCall = listCall.PageToken(pageToken) - } - - res, err := listCall.Do() - if err != nil { - return nil, err - } - pageToken = res.NextPageToken - for _, instance := range res.Items { - if !hostNames[instance.Name] { - continue - } - - longest_tag := "" - for _, tag := range instance.Tags.Items { - if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) { - longest_tag = tag - } - } - if len(longest_tag) > 0 { - tags.Insert(longest_tag) - } else { - return nil, fmt.Errorf("Could not find any tag that is a prefix of instance name for instance %s", instance.Name) - } - } - } - if page >= maxPages { - glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages) - } - } - if len(tags) == 0 { - return nil, fmt.Errorf("No instances found") - } - return tags.List(), nil -} - func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) { pageToken := "" page := 0 @@ -1232,7 +1154,7 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string } func (gce *GCECloud) deleteFirewall(name, region string) error { - mc := newFirewallMetricContext("delete", region) + mc := newFirewallMetricContext("delete") op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { diff --git a/pkg/cloudprovider/providers/gce/gce_test.go b/pkg/cloudprovider/providers/gce/gce_test.go index c05f0bdccbe..c04473f9380 100644 --- a/pkg/cloudprovider/providers/gce/gce_test.go +++ b/pkg/cloudprovider/providers/gce/gce_test.go @@ -149,16 +149,6 @@ func TestScrubDNS(t *testing.T) { } } -func TestCreateFirewallFails(t *testing.T) { - name := "loadbalancer" - region := "us-central1" - desc := "description" - gce := &GCECloud{} - if err := gce.createFirewall(name, region, desc, nil, nil, nil); err == nil { - t.Errorf("error expected when creating firewall without any tags found") - } -} - func TestSplitProviderID(t *testing.T) { providers := []struct { providerID string diff --git a/test/e2e/framework/ingress_utils.go b/test/e2e/framework/ingress_utils.go index c801329fbd4..4b9912bcb9a 100644 --- a/test/e2e/framework/ingress_utils.go +++ b/test/e2e/framework/ingress_utils.go @@ -478,7 +478,7 @@ func (cont *GCEIngressController) deleteURLMap(del bool) (msg string) { func (cont *GCEIngressController) deleteBackendService(del bool) (msg string) { gceCloud := cont.Cloud.Provider.(*gcecloud.GCECloud) - beList, err := gceCloud.ListBackendServices() + beList, err := gceCloud.ListGlobalBackendServices() if err != nil { if cont.isHTTPErrorCode(err, http.StatusNotFound) { return msg @@ -495,7 +495,7 @@ func (cont *GCEIngressController) deleteBackendService(del bool) (msg string) { } if del { Logf("Deleting backed-service: %s", be.Name) - if err := gceCloud.DeleteBackendService(be.Name); err != nil && + if err := gceCloud.DeleteGlobalBackendService(be.Name); err != nil && !cont.isHTTPErrorCode(err, http.StatusNotFound) { msg += fmt.Sprintf("Failed to delete backend service %v: %v\n", be.Name, err) }