diff --git a/pkg/cloudprovider/providers/gce/BUILD b/pkg/cloudprovider/providers/gce/BUILD index 2d407afb860..2155b28cde1 100644 --- a/pkg/cloudprovider/providers/gce/BUILD +++ b/pkg/cloudprovider/providers/gce/BUILD @@ -13,6 +13,8 @@ go_library( srcs = [ "doc.go", "gce.go", + "gce_addresses.go", + "gce_annotations.go", "gce_backendservice.go", "gce_cert.go", "gce_clusterid.go", @@ -24,9 +26,12 @@ go_library( "gce_instancegroup.go", "gce_instances.go", "gce_loadbalancer.go", + "gce_loadbalancer_external.go", + "gce_loadbalancer_internal.go", + "gce_loadbalancer_naming.go", "gce_op.go", "gce_routes.go", - "gce_staticip.go", + "gce_targetpool.go", "gce_targetproxy.go", "gce_urlmap.go", "gce_util.go", diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 5cb2d7bf447..90e2570a429 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -78,6 +78,8 @@ const ( // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. type GCECloud struct { + // ClusterID contains functionality for getting (and initializing) the ingress-uid. Call GCECloud.Initialize() + // for the cloudprovider to start watching the configmap. ClusterID ClusterID service *compute.Service diff --git a/pkg/cloudprovider/providers/gce/gce_addresses.go b/pkg/cloudprovider/providers/gce/gce_addresses.go new file mode 100644 index 00000000000..877d744ddfa --- /dev/null +++ b/pkg/cloudprovider/providers/gce/gce_addresses.go @@ -0,0 +1,96 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import ( + "time" + + compute "google.golang.org/api/compute/v1" +) + +func newAddressMetricContext(request, region string) *metricContext { + return &metricContext{ + start: time.Now(), + attributes: []string{"address_" + request, region, unusedMetricLabel}, + } +} + +// ReserveGlobalAddress creates a global address. +// Caller is allocated a random IP if they do not specify an ipAddress. If an +// ipAddress is specified, it must belong to the current project, eg: an +// ephemeral IP associated with a global forwarding rule. +func (gce *GCECloud) ReserveGlobalAddress(addr *compute.Address) (*compute.Address, error) { + mc := newAddressMetricContext("reserve", "") + op, err := gce.service.GlobalAddresses.Insert(gce.projectID, addr).Do() + if err != nil { + return nil, mc.Observe(err) + } + + if err := gce.waitForGlobalOp(op, mc); err != nil { + return nil, err + } + + return gce.GetGlobalAddress(addr.Name) +} + +// DeleteGlobalAddress deletes a global address by name. +func (gce *GCECloud) DeleteGlobalAddress(name string) error { + mc := newAddressMetricContext("delete", "") + op, err := gce.service.GlobalAddresses.Delete(gce.projectID, name).Do() + if err != nil { + return mc.Observe(err) + } + return gce.waitForGlobalOp(op, mc) +} + +// GetGlobalAddress returns the global address by name. +func (gce *GCECloud) GetGlobalAddress(name string) (*compute.Address, error) { + mc := newAddressMetricContext("get", "") + v, err := gce.service.GlobalAddresses.Get(gce.projectID, name).Do() + return v, mc.Observe(err) +} + +// ReserveRegionAddress creates a region address +func (gce *GCECloud) ReserveRegionAddress(addr *compute.Address, region string) (*compute.Address, error) { + mc := newAddressMetricContext("reserve", region) + op, err := gce.service.Addresses.Insert(gce.projectID, region, addr).Do() + if err != nil { + return nil, mc.Observe(err) + } + if err := gce.waitForRegionOp(op, region, mc); err != nil { + return nil, err + } + + return gce.GetRegionAddress(addr.Name, region) +} + +// DeleteRegionAddress deletes a region address by name. +func (gce *GCECloud) DeleteRegionAddress(name, region string) error { + mc := newAddressMetricContext("delete", region) + op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do() + if err != nil { + return mc.Observe(err) + } + return gce.waitForRegionOp(op, region, mc) +} + +// GetRegionAddress returns the region address by name +func (gce *GCECloud) GetRegionAddress(name, region string) (*compute.Address, error) { + mc := newAddressMetricContext("get", region) + v, err := gce.service.Addresses.Get(gce.projectID, region, name).Do() + return v, mc.Observe(err) +} diff --git a/pkg/cloudprovider/providers/gce/gce_annotations.go b/pkg/cloudprovider/providers/gce/gce_annotations.go new file mode 100644 index 00000000000..98c72577a40 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/gce_annotations.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import "k8s.io/kubernetes/pkg/api/v1" + +type LoadBalancerType string + +const ( + // ServiceAnnotationLoadBalancerType is the annotation used on a service with type LoadBalancer + // dictates what specific kind of GCP LB should be assembled. + // Currently, only "internal" is supported. + ServiceAnnotationLoadBalancerType = "cloud.google.com/load-balancer-type" + + LBTypeInternal LoadBalancerType = "internal" +) + +// GetLoadBalancerAnnotationType returns the type of GCP load balancer which should be assembled. +func GetLoadBalancerAnnotationType(service *v1.Service) (LoadBalancerType, bool) { + v := LoadBalancerType("") + if service.Spec.Type != v1.ServiceTypeLoadBalancer { + return v, false + } + + l, ok := service.Annotations[ServiceAnnotationLoadBalancerType] + v = LoadBalancerType(l) + if !ok { + return v, false + } + + switch v { + case LBTypeInternal: + return v, true + default: + return v, false + } +} diff --git a/pkg/cloudprovider/providers/gce/gce_backendservice.go b/pkg/cloudprovider/providers/gce/gce_backendservice.go index 1d79dfde0d3..d460c6157d8 100644 --- a/pkg/cloudprovider/providers/gce/gce_backendservice.go +++ b/pkg/cloudprovider/providers/gce/gce_backendservice.go @@ -23,23 +23,23 @@ import ( compute "google.golang.org/api/compute/v1" ) -func newBackendServiceMetricContext(request string) *metricContext { +func newBackendServiceMetricContext(request, region string) *metricContext { return &metricContext{ start: time.Now(), - attributes: []string{"backendservice_" + request, unusedMetricLabel, unusedMetricLabel}, + attributes: []string{"backendservice_" + request, region, unusedMetricLabel}, } } // GetGlobalBackendService retrieves a backend by name. func (gce *GCECloud) GetGlobalBackendService(name string) (*compute.BackendService, error) { - mc := newBackendServiceMetricContext("get") + mc := newBackendServiceMetricContext("get", "") v, err := gce.service.BackendServices.Get(gce.projectID, name).Do() return v, mc.Observe(err) } // UpdateGlobalBackendService applies the given BackendService as an update to an existing service. func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) error { - mc := newBackendServiceMetricContext("update") + mc := newBackendServiceMetricContext("update", "") op, err := gce.service.BackendServices.Update(gce.projectID, bg.Name, bg).Do() if err != nil { return mc.Observe(err) @@ -50,7 +50,7 @@ func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) erro // DeleteGlobalBackendService deletes the given BackendService by name. func (gce *GCECloud) DeleteGlobalBackendService(name string) error { - mc := newBackendServiceMetricContext("delete") + mc := newBackendServiceMetricContext("delete", "") op, err := gce.service.BackendServices.Delete(gce.projectID, name).Do() if err != nil { if isHTTPErrorCode(err, http.StatusNotFound) { @@ -64,7 +64,7 @@ func (gce *GCECloud) DeleteGlobalBackendService(name string) error { // CreateGlobalBackendService creates the given BackendService. func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) error { - mc := newBackendServiceMetricContext("create") + mc := newBackendServiceMetricContext("create", "") op, err := gce.service.BackendServices.Insert(gce.projectID, bg).Do() if err != nil { return mc.Observe(err) @@ -75,7 +75,7 @@ func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) erro // ListGlobalBackendServices lists all backend services in the project. func (gce *GCECloud) ListGlobalBackendServices() (*compute.BackendServiceList, error) { - mc := newBackendServiceMetricContext("list") + mc := newBackendServiceMetricContext("list", "") // TODO: use PageToken to list all not just the first 500 v, err := gce.service.BackendServices.List(gce.projectID).Do() return v, mc.Observe(err) @@ -85,7 +85,7 @@ func (gce *GCECloud) ListGlobalBackendServices() (*compute.BackendServiceList, e // name, in the given instanceGroup. The instanceGroupLink is the fully // qualified self link of an instance group. func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) { - mc := newBackendServiceMetricContext("get_health") + mc := newBackendServiceMetricContext("get_health", "") groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink} v, err := gce.service.BackendServices.GetHealth(gce.projectID, name, groupRef).Do() return v, mc.Observe(err) @@ -93,25 +93,25 @@ func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLin // GetRegionBackendService retrieves a backend by name. func (gce *GCECloud) GetRegionBackendService(name, region string) (*compute.BackendService, error) { - mc := newBackendServiceMetricContext("get") + mc := newBackendServiceMetricContext("get", region) v, err := gce.service.RegionBackendServices.Get(gce.projectID, region, name).Do() return v, mc.Observe(err) } // UpdateRegionBackendService applies the given BackendService as an update to an existing service. -func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService) error { - mc := newBackendServiceMetricContext("update") - op, err := gce.service.RegionBackendServices.Update(gce.projectID, bg.Region, bg.Name, bg).Do() +func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService, region string) error { + mc := newBackendServiceMetricContext("update", region) + op, err := gce.service.RegionBackendServices.Update(gce.projectID, region, bg.Name, bg).Do() if err != nil { return mc.Observe(err) } - return gce.waitForRegionOp(op, bg.Region, mc) + return gce.waitForRegionOp(op, region, mc) } // DeleteRegionBackendService deletes the given BackendService by name. func (gce *GCECloud) DeleteRegionBackendService(name, region string) error { - mc := newBackendServiceMetricContext("delete") + mc := newBackendServiceMetricContext("delete", region) op, err := gce.service.RegionBackendServices.Delete(gce.projectID, region, name).Do() if err != nil { if isHTTPErrorCode(err, http.StatusNotFound) { @@ -124,19 +124,19 @@ func (gce *GCECloud) DeleteRegionBackendService(name, region string) error { } // CreateRegionBackendService creates the given BackendService. -func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService) error { - mc := newBackendServiceMetricContext("create") - op, err := gce.service.RegionBackendServices.Insert(gce.projectID, bg.Region, bg).Do() +func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService, region string) error { + mc := newBackendServiceMetricContext("create", region) + op, err := gce.service.RegionBackendServices.Insert(gce.projectID, region, bg).Do() if err != nil { return mc.Observe(err) } - return gce.waitForRegionOp(op, bg.Region, mc) + return gce.waitForRegionOp(op, region, mc) } // ListRegionBackendServices lists all backend services in the project. func (gce *GCECloud) ListRegionBackendServices(region string) (*compute.BackendServiceList, error) { - mc := newBackendServiceMetricContext("list") + mc := newBackendServiceMetricContext("list", region) // TODO: use PageToken to list all not just the first 500 v, err := gce.service.RegionBackendServices.List(gce.projectID, region).Do() return v, mc.Observe(err) @@ -146,7 +146,7 @@ func (gce *GCECloud) ListRegionBackendServices(region string) (*compute.BackendS // name, in the given instanceGroup. The instanceGroupLink is the fully // qualified self link of an instance group. func (gce *GCECloud) GetRegionalBackendServiceHealth(name, region string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) { - mc := newBackendServiceMetricContext("get_health") + mc := newBackendServiceMetricContext("get_health", region) groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink} v, err := gce.service.RegionBackendServices.GetHealth(gce.projectID, region, name, groupRef).Do() return v, mc.Observe(err) diff --git a/pkg/cloudprovider/providers/gce/gce_forwardingrule.go b/pkg/cloudprovider/providers/gce/gce_forwardingrule.go index 0eedf0559cb..de800547ceb 100644 --- a/pkg/cloudprovider/providers/gce/gce_forwardingrule.go +++ b/pkg/cloudprovider/providers/gce/gce_forwardingrule.go @@ -17,7 +17,6 @@ limitations under the License. package gce import ( - "net/http" "time" compute "google.golang.org/api/compute/v1" @@ -44,8 +43,7 @@ func (gce *GCECloud) CreateGlobalForwardingRule(targetProxyLink, ip, name, portR } op, err := gce.service.GlobalForwardingRules.Insert(gce.projectID, rule).Do() if err != nil { - mc.Observe(err) - return nil, err + return nil, mc.Observe(err) } if err = gce.waitForGlobalOp(op, mc); err != nil { return nil, err @@ -56,13 +54,12 @@ func (gce *GCECloud) CreateGlobalForwardingRule(targetProxyLink, ip, name, portR // SetProxyForGlobalForwardingRule links the given TargetHttp(s)Proxy with the given GlobalForwardingRule. // targetProxyLink is the SelfLink of a TargetHttp(s)Proxy. -func (gce *GCECloud) SetProxyForGlobalForwardingRule(fw *compute.ForwardingRule, targetProxyLink string) error { +func (gce *GCECloud) SetProxyForGlobalForwardingRule(forwardingRuleName, targetProxyLink string) error { mc := newForwardingRuleMetricContext("set_proxy", "") op, err := gce.service.GlobalForwardingRules.SetTarget( - gce.projectID, fw.Name, &compute.TargetReference{Target: targetProxyLink}).Do() + gce.projectID, forwardingRuleName, &compute.TargetReference{Target: targetProxyLink}).Do() if err != nil { - mc.Observe(err) - return err + return mc.Observe(err) } return gce.waitForGlobalOp(op, mc) @@ -73,13 +70,7 @@ func (gce *GCECloud) DeleteGlobalForwardingRule(name string) error { mc := newForwardingRuleMetricContext("delete", "") op, err := gce.service.GlobalForwardingRules.Delete(gce.projectID, name).Do() if err != nil { - if isHTTPErrorCode(err, http.StatusNotFound) { - mc.Observe(nil) - return nil - } - - mc.Observe(err) - return err + return mc.Observe(err) } return gce.waitForGlobalOp(op, mc) @@ -99,3 +90,41 @@ func (gce *GCECloud) ListGlobalForwardingRules() (*compute.ForwardingRuleList, e v, err := gce.service.GlobalForwardingRules.List(gce.projectID).Do() return v, mc.Observe(err) } + +// GetRegionForwardingRule returns the RegionalForwardingRule by name & region. +func (gce *GCECloud) GetRegionForwardingRule(name, region string) (*compute.ForwardingRule, error) { + mc := newForwardingRuleMetricContext("get", region) + v, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() + return v, mc.Observe(err) +} + +// ListRegionForwardingRules lists all RegionalForwardingRules in the project & region. +func (gce *GCECloud) ListRegionForwardingRules(region string) (*compute.ForwardingRuleList, error) { + mc := newForwardingRuleMetricContext("list", region) + // TODO: use PageToken to list all not just the first 500 + v, err := gce.service.ForwardingRules.List(gce.projectID, region).Do() + return v, mc.Observe(err) +} + +// CreateRegionForwardingRule creates and returns a +// RegionalForwardingRule that points to the given BackendService +func (gce *GCECloud) CreateRegionForwardingRule(rule *compute.ForwardingRule, region string) error { + mc := newForwardingRuleMetricContext("create", region) + op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, rule).Do() + if err != nil { + return mc.Observe(err) + } + + return gce.waitForRegionOp(op, region, mc) +} + +// DeleteRegionForwardingRule deletes the RegionalForwardingRule by name & region. +func (gce *GCECloud) DeleteRegionForwardingRule(name, region string) error { + mc := newForwardingRuleMetricContext("delete", region) + op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() + if err != nil { + return mc.Observe(err) + } + + return gce.waitForRegionOp(op, region, mc) +} diff --git a/pkg/cloudprovider/providers/gce/gce_healthchecks.go b/pkg/cloudprovider/providers/gce/gce_healthchecks.go index 7bd060b54a7..4f5f90c6a1d 100644 --- a/pkg/cloudprovider/providers/gce/gce_healthchecks.go +++ b/pkg/cloudprovider/providers/gce/gce_healthchecks.go @@ -17,7 +17,6 @@ limitations under the License. package gce import ( - "fmt" "time" "k8s.io/kubernetes/pkg/api/v1" @@ -208,28 +207,12 @@ func GetNodesHealthCheckPort() int32 { return lbNodesHealthCheckPort } -// getNodesHealthCheckPath returns the health check path used by the GCE load +// GetNodesHealthCheckPath returns the health check path used by the GCE load // balancers (l4) for performing health checks on nodes. -func getNodesHealthCheckPath() string { +func GetNodesHealthCheckPath() string { return nodesHealthCheckPath } -// makeNodesHealthCheckName returns name of the health check resource used by -// the GCE load balancers (l4) for performing health checks on nodes. -func makeNodesHealthCheckName(clusterID string) string { - return fmt.Sprintf("k8s-%v-node", clusterID) -} - -// MakeHealthCheckFirewallName returns the firewall name used by the GCE load -// balancers (l4) for performing health checks. -func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string { - if isNodesHealthCheck { - // TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc. - return makeNodesHealthCheckName(clusterID) + "-http-hc" - } - return "k8s-" + hcName + "-http-hc" -} - // isAtLeastMinNodesHealthCheckVersion checks if a version is higher than // `minNodesHealthCheckVersion`. func isAtLeastMinNodesHealthCheckVersion(vstring string) bool { diff --git a/pkg/cloudprovider/providers/gce/gce_instancegroup.go b/pkg/cloudprovider/providers/gce/gce_instancegroup.go index c25aeb1af08..f04d5676cca 100644 --- a/pkg/cloudprovider/providers/gce/gce_instancegroup.go +++ b/pkg/cloudprovider/providers/gce/gce_instancegroup.go @@ -81,20 +81,16 @@ func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, stat // AddInstancesToInstanceGroup adds the given instances to the given // instance group. -func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceNames []string) error { +func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceRefs []*compute.InstanceReference) error { mc := newInstanceGroupMetricContext("add_instances", zone) - if len(instanceNames) == 0 { + if len(instanceRefs) == 0 { return nil } - // Adding the same instance twice will result in a 4xx error - instances := []*compute.InstanceReference{} - for _, ins := range instanceNames { - instances = append(instances, &compute.InstanceReference{Instance: makeHostURL(gce.projectID, zone, ins)}) - } + op, err := gce.service.InstanceGroups.AddInstances( gce.projectID, zone, name, &compute.InstanceGroupsAddInstancesRequest{ - Instances: instances, + Instances: instanceRefs, }).Do() if err != nil { return mc.Observe(err) @@ -105,21 +101,16 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta // RemoveInstancesFromInstanceGroup removes the given instances from // the instance group. -func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceNames []string) error { +func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceRefs []*compute.InstanceReference) error { mc := newInstanceGroupMetricContext("remove_instances", zone) - if len(instanceNames) == 0 { + if len(instanceRefs) == 0 { return nil } - instances := []*compute.InstanceReference{} - for _, ins := range instanceNames { - instanceLink := makeHostURL(gce.projectID, zone, ins) - instances = append(instances, &compute.InstanceReference{Instance: instanceLink}) - } op, err := gce.service.InstanceGroups.RemoveInstances( gce.projectID, zone, name, &compute.InstanceGroupsRemoveInstancesRequest{ - Instances: instances, + Instances: instanceRefs, }).Do() if err != nil { return mc.Observe(err) diff --git a/pkg/cloudprovider/providers/gce/gce_instances.go b/pkg/cloudprovider/providers/gce/gce_instances.go index 33437a34628..3afd3f2969f 100644 --- a/pkg/cloudprovider/providers/gce/gce_instances.go +++ b/pkg/cloudprovider/providers/gce/gce_instances.go @@ -33,6 +33,11 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/cloudprovider" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" +) + +const ( + defaultZone = "" ) func newInstancesMetricContext(request, zone string) *metricContext { @@ -42,6 +47,34 @@ func newInstancesMetricContext(request, zone string) *metricContext { } } +func splitNodesByZone(nodes []*v1.Node) map[string][]*v1.Node { + zones := make(map[string][]*v1.Node) + for _, n := range nodes { + z := getZone(n) + if z != defaultZone { + zones[z] = append(zones[z], n) + } + } + return zones +} + +func getZone(n *v1.Node) string { + zone, ok := n.Labels[kubeletapis.LabelZoneFailureDomain] + if !ok { + return defaultZone + } + return zone +} + +// ToInstanceReferences returns instance references by links +func (gce *GCECloud) ToInstanceReferences(zone string, instanceNames []string) (refs []*compute.InstanceReference) { + for _, ins := range instanceNames { + instanceLink := makeHostURL(gce.projectID, zone, ins) + refs = append(refs, &compute.InstanceReference{Instance: instanceLink}) + } + return refs +} + // NodeAddresses is an implementation of Instances.NodeAddresses. func (gce *GCECloud) NodeAddresses(_ types.NodeName) ([]v1.NodeAddress, error) { internalIP, err := metadata.Get("instance/network-interfaces/0/ip") diff --git a/pkg/cloudprovider/providers/gce/gce_loadbalancer.go b/pkg/cloudprovider/providers/gce/gce_loadbalancer.go index b3cf4e7792c..b449d62b64a 100644 --- a/pkg/cloudprovider/providers/gce/gce_loadbalancer.go +++ b/pkg/cloudprovider/providers/gce/gce_loadbalancer.go @@ -20,21 +20,14 @@ import ( "flag" "fmt" "net" - "net/http" - "strconv" "strings" "time" - "k8s.io/apimachinery/pkg/types" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/sets" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api/v1" - apiservice "k8s.io/kubernetes/pkg/api/v1/service" "k8s.io/kubernetes/pkg/cloudprovider" netsets "k8s.io/kubernetes/pkg/util/net/sets" - - "github.com/golang/glog" - compute "google.golang.org/api/compute/v1" ) type cidrs struct { @@ -53,22 +46,16 @@ func newLoadBalancerMetricContext(request, region string) *metricContext { } } -func newTargetPoolMetricContext(request, region string) *metricContext { - return &metricContext{ - start: time.Now(), - attributes: []string{"targetpool_" + request, region, unusedMetricLabel}, - } -} +type lbScheme string -func newAddressMetricContext(request, region string) *metricContext { - return &metricContext{ - start: time.Now(), - attributes: []string{"address_" + request, region, unusedMetricLabel}, - } -} +const ( + schemeExternal lbScheme = "EXTERNAL" + schemeInternal lbScheme = "INTERNAL" +) func init() { var err error + // LB L7 proxies and all L3/4/7 health checkers have client addresses within these known CIDRs. lbSrcRngsFlag.ipn, err = netsets.ParseIPNets([]string{"130.211.0.0/22", "35.191.0.0/16", "209.85.152.0/22", "209.85.204.0/22"}...) if err != nil { panic("Incorrect default GCE L7 source ranges") @@ -110,1080 +97,109 @@ func LoadBalancerSrcRanges() []string { } // GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer -func (gce *GCECloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) { - loadBalancerName := cloudprovider.GetLoadBalancerName(service) - fwd, err := gce.service.ForwardingRules.Get(gce.projectID, gce.region, loadBalancerName).Do() +func (gce *GCECloud) GetLoadBalancer(clusterName string, svc *v1.Service) (*v1.LoadBalancerStatus, bool, error) { + loadBalancerName := cloudprovider.GetLoadBalancerName(svc) + fwd, err := gce.GetRegionForwardingRule(loadBalancerName, gce.region) if err == nil { status := &v1.LoadBalancerStatus{} status.Ingress = []v1.LoadBalancerIngress{{IP: fwd.IPAddress}} return status, true, nil } - if isHTTPErrorCode(err, http.StatusNotFound) { - return nil, false, nil - } - return nil, false, err + return nil, false, ignoreNotFound(err) } // EnsureLoadBalancer is an implementation of LoadBalancer.EnsureLoadBalancer. -// Our load balancers in GCE consist of four separate GCE resources - a static -// IP address, a firewall rule, a target pool, and a forwarding rule. This -// function has to manage all of them. -// -// Due to an interesting series of design decisions, this handles both creating -// new load balancers and updating existing load balancers, recognizing when -// each is needed. -func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { - if len(nodes) == 0 { - return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts") - } - - hostNames := nodeNames(nodes) - supportsNodesHealthCheck := supportsNodesHealthCheck(nodes) - hosts, err := gce.getInstancesByNames(hostNames) - if err != nil { - return nil, err - } - - loadBalancerName := cloudprovider.GetLoadBalancerName(apiService) - loadBalancerIP := apiService.Spec.LoadBalancerIP - ports := apiService.Spec.Ports - portStr := []string{} - for _, p := range apiService.Spec.Ports { - portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port)) - } - - affinityType := apiService.Spec.SessionAffinity - - serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name} - glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", - loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations) - - // Check if the forwarding rule exists, and if so, what its IP is. - fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports) - if err != nil { - return nil, err - } - if !fwdRuleExists { - glog.V(2).Infof("Forwarding rule %v for Service %v/%v doesn't exist", - loadBalancerName, apiService.Namespace, apiService.Name) - } - - // Make sure we know which IP address will be used and have properly reserved - // it as static before moving forward with the rest of our operations. - // - // We use static IP addresses when updating a load balancer to ensure that we - // can replace the load balancer's other components without changing the - // address its service is reachable on. We do it this way rather than always - // keeping the static IP around even though this is more complicated because - // it makes it less likely that we'll run into quota issues. Only 7 static - // IP addresses are allowed per region by default. - // - // We could let an IP be allocated for us when the forwarding rule is created, - // but we need the IP to set up the firewall rule, and we want to keep the - // forwarding rule creation as the last thing that needs to be done in this - // function in order to maintain the invariant that "if the forwarding rule - // exists, the LB has been fully created". - ipAddress := "" - - // Through this process we try to keep track of whether it is safe to - // release the IP that was allocated. If the user specifically asked for - // an IP, we assume they are managing it themselves. Otherwise, we will - // release the IP in case of early-terminating failure or upon successful - // creating of the LB. - // TODO(#36535): boil this logic down into a set of component functions - // and key the flag values off of errors returned. - isUserOwnedIP := false // if this is set, we never release the IP - isSafeToReleaseIP := false - defer func() { - if isUserOwnedIP { - return - } - if isSafeToReleaseIP { - if err := gce.deleteStaticIP(loadBalancerName, gce.region); err != nil { - glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err) - } - glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress) - } else { - glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err) - } - }() - - if loadBalancerIP != "" { - // If a specific IP address has been requested, we have to respect the - // user's request and use that IP. If the forwarding rule was already using - // a different IP, it will be harmlessly abandoned because it was only an - // ephemeral IP (or it was a different static IP owned by the user, in which - // case we shouldn't delete it anyway). - if isStatic, err := gce.projectOwnsStaticIP(loadBalancerName, gce.region, loadBalancerIP); err != nil { - return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", loadBalancerIP, err) - } else if isStatic { - // The requested IP is a static IP, owned and managed by the user. - isUserOwnedIP = true - isSafeToReleaseIP = false - ipAddress = loadBalancerIP - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", loadBalancerName, serviceName, ipAddress) - } else if loadBalancerIP == fwdRuleIP { - // The requested IP is not a static IP, but is currently assigned - // to this forwarding rule, so we can keep it. - isUserOwnedIP = false - isSafeToReleaseIP = true - ipAddress, _, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP) - if err != nil { - return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) - } - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", loadBalancerName, serviceName, ipAddress) - } else { - // The requested IP is not static and it is not assigned to the - // current forwarding rule. It might be attached to a different - // rule or it might not be part of this project at all. Either - // way, we can't use it. - return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", loadBalancerIP, loadBalancerName, serviceName, err) - } - } else { - // The user did not request a specific IP. - isUserOwnedIP = false - - // This will either allocate a new static IP if the forwarding rule didn't - // already have an IP, or it will promote the forwarding rule's current - // IP from ephemeral to static, or it will just get the IP if it is - // already static. - existed := false - ipAddress, existed, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP) - if err != nil { - return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) - } - if existed { - // If the IP was not specifically requested by the user, but it - // already existed, it seems to be a failed update cycle. We can - // use this IP and try to run through the process again, but we - // should not release the IP unless it is explicitly flagged as OK. - isSafeToReleaseIP = false - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", loadBalancerName, serviceName, ipAddress) - } else { - // For total clarity. The IP did not pre-exist and the user did - // not ask for a particular one, so we can release the IP in case - // of failure or success. - isSafeToReleaseIP = true - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", loadBalancerName, serviceName, ipAddress) - } - } - - // Deal with the firewall next. The reason we do this here rather than last - // is because the forwarding rule is used as the indicator that the load - // balancer is fully created - it's what getLoadBalancer checks for. - // Check if user specified the allow source range - sourceRanges, err := apiservice.GetLoadBalancerSourceRanges(apiService) - if err != nil { - return nil, err - } - - firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports, sourceRanges) - if err != nil { - return nil, err - } - - if firewallNeedsUpdate { - desc := makeFirewallDescription(serviceName.String(), ipAddress) - // Unlike forwarding rules and target pools, firewalls can be updated - // without needing to be deleted and recreated. - if firewallExists { - glog.Infof("EnsureLoadBalancer(%v(%v)): updating firewall", loadBalancerName, serviceName) - if err := gce.updateFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil { - return nil, err - } - glog.Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName) - } else { - glog.Infof("EnsureLoadBalancer(%v(%v)): creating firewall", loadBalancerName, serviceName) - if err := gce.createFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil { - return nil, err - } - glog.Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName) - } - } - - tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(loadBalancerName, gce.region, affinityType) - if err != nil { - return nil, err - } - if !tpExists { - glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name) - } - +func (gce *GCECloud) EnsureLoadBalancer(clusterName string, svc *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { + loadBalancerName := cloudprovider.GetLoadBalancerName(svc) + desiredScheme := getSvcScheme(svc) clusterID, err := gce.ClusterID.GetID() if err != nil { - return nil, fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err) + return nil, err } - // Check which health check needs to create and which health check needs to delete. - // Health check management is coupled with target pool operation to prevent leaking. - var hcToCreate, hcToDelete *compute.HttpHealthCheck - hcLocalTrafficExisting, err := gce.GetHttpHealthCheck(loadBalancerName) - if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { - return nil, fmt.Errorf("error checking HTTP health check %s: %v", loadBalancerName, err) + + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): ensure %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, gce.region, desiredScheme) + + existingFwdRule, err := gce.GetRegionForwardingRule(loadBalancerName, gce.region) + if err != nil && !isNotFound(err) { + return nil, err } - if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" { - glog.V(4).Infof("service %v (%v) needs local traffic health checks on: %d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path) - if hcLocalTrafficExisting == nil { - // This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service - // turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the - // target pool to use local traffic health check. - glog.V(2).Infof("Updating from nodes health checks to local traffic health checks for service %v LB %v", apiService.Name, loadBalancerName) - if supportsNodesHealthCheck { - hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort()) + + if existingFwdRule != nil { + existingScheme := lbScheme(strings.ToUpper(existingFwdRule.LoadBalancingScheme)) + + // If the loadbalancer type changes between INTERNAL and EXTERNAL, the old load balancer should be deleted. + if existingScheme != desiredScheme { + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): deleting existing %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, gce.region, existingScheme) + switch existingScheme { + case schemeInternal: + err = gce.ensureInternalLoadBalancerDeleted(clusterName, clusterID, svc) + default: + err = gce.ensureExternalLoadBalancerDeleted(clusterName, svc) } - tpNeedsUpdate = true - } - hcToCreate = makeHttpHealthCheck(loadBalancerName, path, healthCheckNodePort) - } else { - glog.V(4).Infof("Service %v needs nodes health checks.", apiService.Name) - if hcLocalTrafficExisting != nil { - // This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service - // and turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the - // target pool to use nodes health check. - glog.V(2).Infof("Updating from local traffic health checks to nodes health checks for service %v LB %v", apiService.Name, loadBalancerName) - hcToDelete = hcLocalTrafficExisting - tpNeedsUpdate = true - } - if supportsNodesHealthCheck { - hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort()) - } - } - // Now we get to some slightly more interesting logic. - // First, neither target pools nor forwarding rules can be updated in place - - // they have to be deleted and recreated. - // Second, forwarding rules are layered on top of target pools in that you - // can't delete a target pool that's currently in use by a forwarding rule. - // Thus, we have to tear down the forwarding rule if either it or the target - // pool needs to be updated. - if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsUpdate) { - // Begin critical section. If we have to delete the forwarding rule, - // and something should fail before we recreate it, don't release the - // IP. That way we can come back to it later. - isSafeToReleaseIP = false - if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil { - return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err) - } - glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName) - } - if tpExists && tpNeedsUpdate { - // Pass healthchecks to deleteTargetPool to cleanup health checks after cleaning up the target pool itself. - var hcNames []string - if hcToDelete != nil { - hcNames = append(hcNames, hcToDelete.Name) - } - if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil { - return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err) - } - glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName) - } - - // Once we've deleted the resources (if necessary), build them back up (or for - // the first time if they're new). - if tpNeedsUpdate { - createInstances := hosts - if len(hosts) > maxTargetPoolCreateInstances { - createInstances = createInstances[:maxTargetPoolCreateInstances] - } - // Pass healthchecks to createTargetPool which needs them as health check links in the target pool - if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddress, gce.region, createInstances, affinityType, hcToCreate); err != nil { - return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err) - } - if hcToCreate != nil { - glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks %v for target pool", loadBalancerName, serviceName, hcToCreate.Name) - } - if len(hosts) <= maxTargetPoolCreateInstances { - glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName) - } else { - glog.Infof("EnsureLoadBalancer(%v(%v)): created initial target pool (now updating with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances) - - created := sets.NewString() - for _, host := range createInstances { - created.Insert(host.makeComparableHostPath()) + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): done deleting existing %v loadbalancer. err: %v", clusterName, svc.Namespace, svc.Name, loadBalancerName, gce.region, existingScheme, err) + if err != nil { + return nil, err } - if err := gce.updateTargetPool(loadBalancerName, created, hosts); err != nil { - return nil, fmt.Errorf("failed to update target pool %s: %v", loadBalancerName, err) - } - glog.Infof("EnsureLoadBalancer(%v(%v)): updated target pool (with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances) } } - if tpNeedsUpdate || fwdRuleNeedsUpdate { - glog.Infof("EnsureLoadBalancer(%v(%v)): creating forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress) - if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil { - return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err) - } - // End critical section. It is safe to release the static IP (which - // just demotes it to ephemeral) now that it is attached. In the case - // of a user-requested IP, the "is user-owned" flag will be set, - // preventing it from actually being released. - isSafeToReleaseIP = true - glog.Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress) + + var status *v1.LoadBalancerStatus + switch desiredScheme { + case schemeInternal: + status, err = gce.ensureInternalLoadBalancer(clusterName, clusterID, svc, existingFwdRule, nodes) + default: + status, err = gce.ensureExternalLoadBalancer(clusterName, svc, existingFwdRule, nodes) } - - status := &v1.LoadBalancerStatus{} - status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddress}} - - return status, nil + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): done ensuring loadbalancer. err: %v", clusterName, svc.Namespace, svc.Name, loadBalancerName, gce.region, err) + return status, err } // UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer. -func (gce *GCECloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { - hosts, err := gce.getInstancesByNames(nodeNames(nodes)) +func (gce *GCECloud) UpdateLoadBalancer(clusterName string, svc *v1.Service, nodes []*v1.Node) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(svc) + scheme := getSvcScheme(svc) + clusterID, err := gce.ClusterID.GetID() if err != nil { return err } - loadBalancerName := cloudprovider.GetLoadBalancerName(service) - pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do() - if err != nil { - return err - } - existing := sets.NewString() - for _, instance := range pool.Instances { - existing.Insert(hostURLToComparablePath(instance)) - } + glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v, %v): updating with %d nodes", clusterName, svc.Namespace, svc.Name, loadBalancerName, gce.region, len(nodes)) - return gce.updateTargetPool(loadBalancerName, existing, hosts) + switch scheme { + case schemeInternal: + err = gce.updateInternalLoadBalancer(clusterName, clusterID, svc, nodes) + default: + err = gce.updateExternalLoadBalancer(clusterName, svc, nodes) + } + glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v, %v): done updating. err: %v", clusterName, svc.Namespace, svc.Name, loadBalancerName, gce.region, err) + return err } // EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted. -func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error { - loadBalancerName := cloudprovider.GetLoadBalancerName(service) - glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, loadBalancerName, - gce.region) - - var hcNames []string - if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" { - hcToDelete, err := gce.GetHttpHealthCheck(loadBalancerName) - if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err) - return err - } - hcNames = append(hcNames, hcToDelete.Name) - } else { - clusterID, err := gce.ClusterID.GetID() - if err != nil { - return fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err) - } - // EnsureLoadBalancerDeleted() could be triggered by changing service from - // LoadBalancer type to others. In this case we have no idea whether it was - // using local traffic health check or nodes health check. Attempt to delete - // both to prevent leaking. - hcNames = append(hcNames, loadBalancerName) - hcNames = append(hcNames, makeNodesHealthCheckName(clusterID)) - } - - errs := utilerrors.AggregateGoroutines( - func() error { return gce.deleteFirewall(makeFirewallName(loadBalancerName), gce.region) }, - // Even though we don't hold on to static IPs for load balancers, it's - // possible that EnsureLoadBalancer left one around in a failed - // creation/update attempt, so make sure we clean it up here just in case. - func() error { return gce.deleteStaticIP(loadBalancerName, gce.region) }, - func() error { - // The forwarding rule must be deleted before either the target pool can, - // unfortunately, so we have to do these two serially. - if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil { - return err - } - if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil { - return err - } - return nil - }, - ) - if errs != nil { - return utilerrors.Flatten(errs) - } - return nil -} - -func (gce *GCECloud) DeleteForwardingRule(name string) error { - region, err := GetGCERegion(gce.localZone) - if err != nil { - return err - } - return gce.deleteForwardingRule(name, region) -} - -func (gce *GCECloud) deleteForwardingRule(name, region string) error { - mc := newForwardingRuleMetricContext("delete", region) - op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() - - if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Forwarding rule %s already deleted. Continuing to delete other resources.", name) - } else if err != nil { - glog.Warningf("Failed to delete forwarding rule %s: got error %s.", name, err.Error()) - return mc.Observe(err) - } else { - if err := gce.waitForRegionOp(op, region, mc); err != nil { - glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.", - name, err.Error()) - return err - } - } - return nil -} - -// DeleteTargetPool deletes the given target pool. -func (gce *GCECloud) DeleteTargetPool(name string, hcNames ...string) error { - region, err := GetGCERegion(gce.localZone) - if err != nil { - return err - } - return gce.deleteTargetPool(name, region, hcNames...) -} - -func (gce *GCECloud) deleteTargetPool(name, region string, hcNames ...string) error { - mc := newTargetPoolMetricContext("delete", region) - op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do() - - if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name) - } else if err != nil { - glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error()) - return mc.Observe(err) - } else { - if err := gce.waitForRegionOp(op, region, mc); err != nil { - glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.", - name, err.Error()) - return err - } - } - - // Deletion of health checks is allowed only after the TargetPool reference is deleted - for _, hcName := range hcNames { - if err = func() error { - // Check whether it is nodes health check, which has different name from the load-balancer. - isNodesHealthCheck := hcName != name - if isNodesHealthCheck { - // Lock to prevent deleting necessary nodes health check before it gets attached - // to target pool. - gce.sharedResourceLock.Lock() - defer gce.sharedResourceLock.Unlock() - } - glog.Infof("Deleting health check %v", hcName) - if err := gce.DeleteHttpHealthCheck(hcName); err != nil { - // Delete nodes health checks will fail if any other target pool is using it. - if isInUsedByError(err) { - glog.V(4).Infof("Health check %v is in used: %v.", hcName, err) - return nil - } else if !isHTTPErrorCode(err, http.StatusNotFound) { - glog.Warningf("Failed to delete health check %v: %v", hcName, err) - return err - } - // StatusNotFound could happen when: - // - This is the first attempt but we pass in a healthcheck that is already deleted - // to prevent leaking. - // - This is the first attempt but user manually deleted the heathcheck. - // - This is a retry and in previous round we failed to delete the healthcheck firewall - // after deleted the healthcheck. - // We continue to delete the healthcheck firewall to prevent leaking. - glog.V(4).Infof("Health check %v is already deleted.", hcName) - } - clusterID, err := gce.ClusterID.GetID() - if err != nil { - return fmt.Errorf("error getting cluster ID: %v", err) - } - // If health check is deleted without error, it means no load-balancer is using it. - // So we should delete the health check firewall as well. - fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck) - glog.Infof("Deleting firewall %v.", fwName) - if err := gce.DeleteFirewall(fwName); err != nil { - if isHTTPErrorCode(err, http.StatusNotFound) { - glog.V(4).Infof("Firewall %v is already deleted.", fwName) - return nil - } - return err - } - return nil - }(); err != nil { - return err - } - } - - return nil -} - -func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error { - // health check management is coupled with targetPools to prevent leaks. A - // target pool is the only thing that requires a health check, so we delete - // associated checks on teardown, and ensure checks on setup. - hcLinks := []string{} - if hc != nil { - // Check whether it is nodes health check, which has different name from the load-balancer. - isNodesHealthCheck := hc.Name != name - if isNodesHealthCheck { - // Lock to prevent necessary nodes health check / firewall gets deleted. - gce.sharedResourceLock.Lock() - defer gce.sharedResourceLock.Unlock() - } - if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, gce.region, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil { - return err - } - var err error - if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil { - return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hc.Port, hc.RequestPath, err) - } - hcLinks = append(hcLinks, hc.SelfLink) - } - - var instances []string - for _, host := range hosts { - instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name)) - } - glog.Infof("Creating targetpool %v with %d healthchecks", name, len(hcLinks)) - pool := &compute.TargetPool{ - Name: name, - Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), - Instances: instances, - SessionAffinity: translateAffinityType(affinityType), - HealthChecks: hcLinks, - } - - mc := newTargetPoolMetricContext("insert", region) - op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do() - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return mc.Observe(err) - } - if op != nil { - err = gce.waitForRegionOp(op, region, mc) - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return err - } - } - return nil -} - -func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.String, hosts []*gceInstance) error { - var toAdd []*compute.InstanceReference - var toRemove []*compute.InstanceReference - for _, host := range hosts { - link := host.makeComparableHostPath() - if !existing.Has(link) { - toAdd = append(toAdd, &compute.InstanceReference{Instance: link}) - } - existing.Delete(link) - } - for link := range existing { - toRemove = append(toRemove, &compute.InstanceReference{Instance: link}) - } - - if len(toAdd) > 0 { - add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd} - - mc := newTargetPoolMetricContext("update", gce.region) - op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do() - if err != nil { - return mc.Observe(err) - } - if err := gce.waitForRegionOp(op, gce.region, mc); err != nil { - return err - } - } - - if len(toRemove) > 0 { - mc := newTargetPoolMetricContext("delete", gce.region) - rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove} - op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do() - if err != nil { - return mc.Observe(err) - } - if err := gce.waitForRegionOp(op, gce.region, mc); err != nil { - return err - } - } - - // Try to verify that the correct number of nodes are now in the target pool. - // We've been bitten by a bug here before (#11327) where all nodes were - // accidentally removed and want to make similar problems easier to notice. - updatedPool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do() - if err != nil { - return err - } - if len(updatedPool.Instances) != len(hosts) { - glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s", - len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ",")) - return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts)) - } - return nil -} - -func (gce *GCECloud) targetPoolURL(name, region string) string { - return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) -} - -func makeHttpHealthCheck(name, path string, port int32) *compute.HttpHealthCheck { - return &compute.HttpHealthCheck{ - Name: name, - Port: int64(port), - RequestPath: path, - Host: "", - Description: makeHealthCheckDescription(name), - CheckIntervalSec: gceHcCheckIntervalSeconds, - TimeoutSec: gceHcTimeoutSeconds, - HealthyThreshold: gceHcHealthyThreshold, - UnhealthyThreshold: gceHcUnhealthyThreshold, - } -} - -func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) { - newHC := makeHttpHealthCheck(name, path, port) - hc, err = gce.GetHttpHealthCheck(name) - if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path) - if err = gce.CreateHttpHealthCheck(newHC); err != nil { - return nil, err - } - hc, err = gce.GetHttpHealthCheck(name) - if err != nil { - glog.Errorf("Failed to get http health check %v", err) - return nil, err - } - glog.Infof("Created HTTP health check %v healthCheckNodePort: %d", name, port) - return hc, nil - } - // Validate health check fields - glog.V(4).Infof("Checking http health check params %s", name) - drift := hc.Port != int64(port) || hc.RequestPath != path || hc.Description != makeHealthCheckDescription(name) - drift = drift || hc.CheckIntervalSec != gceHcCheckIntervalSeconds || hc.TimeoutSec != gceHcTimeoutSeconds - drift = drift || hc.UnhealthyThreshold != gceHcUnhealthyThreshold || hc.HealthyThreshold != gceHcHealthyThreshold - if drift { - glog.Warningf("Health check %v exists but parameters have drifted - updating...", name) - if err := gce.UpdateHttpHealthCheck(newHC); err != nil { - glog.Warningf("Failed to reconcile http health check %v parameters", name) - return nil, err - } - glog.V(4).Infof("Corrected health check %v parameters successful", name) - } - return hc, nil -} - -// Passing nil for requested IP is perfectly fine - it just means that no specific -// IP is being requested. -// Returns whether the forwarding rule exists, whether it needs to be updated, -// what its IP address is (if it exists), and any error we encountered. -func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []v1.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) { - fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() - if err != nil { - if isHTTPErrorCode(err, http.StatusNotFound) { - return false, true, "", nil - } - // Err on the side of caution in case of errors. Caller should notice the error and retry. - // We never want to end up recreating resources because gce api flaked. - return true, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err) - } - // If the user asks for a specific static ip through the Service spec, - // check that we're actually using it. - // TODO: we report loadbalancer IP through status, so we want to verify if - // that matches the forwarding rule as well. - if loadBalancerIP != "" && loadBalancerIP != fwd.IPAddress { - glog.Infof("LoadBalancer ip for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPAddress, loadBalancerIP) - return true, true, fwd.IPAddress, nil - } - portRange, err := loadBalancerPortRange(ports) - if err != nil { - // Err on the side of caution in case of errors. Caller should notice the error and retry. - // We never want to end up recreating resources because gce api flaked. - return true, false, "", err - } - if portRange != fwd.PortRange { - glog.Infof("LoadBalancer port range for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.PortRange, portRange) - return true, true, fwd.IPAddress, nil - } - // The service controller verified all the protocols match on the ports, just check the first one - if string(ports[0].Protocol) != fwd.IPProtocol { - glog.Infof("LoadBalancer protocol for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPProtocol, string(ports[0].Protocol)) - return true, true, fwd.IPAddress, nil - } - - return true, false, fwd.IPAddress, nil -} - -// Doesn't check whether the hosts have changed, since host updating is handled -// separately. -func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType v1.ServiceAffinity) (exists bool, needsUpdate bool, err error) { - tp, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() - if err != nil { - if isHTTPErrorCode(err, http.StatusNotFound) { - return false, true, nil - } - // Err on the side of caution in case of errors. Caller should notice the error and retry. - // We never want to end up recreating resources because gce api flaked. - return true, false, fmt.Errorf("error getting load balancer's target pool: %v", err) - } - // TODO: If the user modifies their Service's session affinity, it *should* - // reflect in the associated target pool. However, currently not setting the - // session affinity on a target pool defaults it to the empty string while - // not setting in on a Service defaults it to None. There is a lack of - // documentation around the default setting for the target pool, so if we - // find it's the undocumented empty string, don't blindly recreate the - // target pool (which results in downtime). Fix this when we have formally - // defined the defaults on either side. - if tp.SessionAffinity != "" && translateAffinityType(affinityType) != tp.SessionAffinity { - glog.Infof("LoadBalancer target pool %v changed affinity from %v to %v", name, tp.SessionAffinity, affinityType) - return true, true, nil - } - return true, false, nil -} - -func (h *gceInstance) makeComparableHostPath() string { - return fmt.Sprintf("/zones/%s/instances/%s", h.Zone, h.Name) -} - -func nodeNames(nodes []*v1.Node) []string { - ret := make([]string, len(nodes)) - for i, node := range nodes { - ret[i] = node.Name - } - return ret -} - -func makeHostURL(projectID, zone, host string) string { - host = canonicalizeInstanceName(host) - return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/instances/%s", - projectID, zone, host) -} - -func hostURLToComparablePath(hostURL string) string { - idx := strings.Index(hostURL, "/zones/") - if idx < 0 { - return "" - } - return hostURL[idx:] -} - -func makeHealthCheckDescription(serviceName string) string { - return fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName) -} - -func loadBalancerPortRange(ports []v1.ServicePort) (string, error) { - if len(ports) == 0 { - return "", fmt.Errorf("no ports specified for GCE load balancer") - } - - // The service controller verified all the protocols match on the ports, just check and use the first one - if ports[0].Protocol != v1.ProtocolTCP && ports[0].Protocol != v1.ProtocolUDP { - return "", fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(ports[0].Protocol)) - } - - minPort := int32(65536) - maxPort := int32(0) - for i := range ports { - if ports[i].Port < minPort { - minPort = ports[i].Port - } - if ports[i].Port > maxPort { - maxPort = ports[i].Port - } - } - return fmt.Sprintf("%d-%d", minPort, maxPort), nil -} - -// translate from what K8s supports to what the cloud provider supports for session affinity. -func translateAffinityType(affinityType v1.ServiceAffinity) string { - switch affinityType { - case v1.ServiceAffinityClientIP: - return gceAffinityTypeClientIP - case v1.ServiceAffinityNone: - return gceAffinityTypeNone - default: - glog.Errorf("Unexpected affinity type: %v", affinityType) - return gceAffinityTypeNone - } -} - -func makeFirewallName(name string) string { - return fmt.Sprintf("k8s-fw-%s", name) -} - -func makeFirewallDescription(serviceName, ipAddress string) string { - return fmt.Sprintf(`{"kubernetes.io/service-name":"%s", "kubernetes.io/service-ip":"%s"}`, - serviceName, ipAddress) -} - -func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []v1.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) { - fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do() - if err != nil { - if isHTTPErrorCode(err, http.StatusNotFound) { - return false, true, nil - } - return false, false, fmt.Errorf("error getting load balancer's firewall: %v", err) - } - if fw.Description != makeFirewallDescription(serviceName, ipAddress) { - return true, true, nil - } - if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") { - return true, true, nil - } - // Make sure the allowed ports match. - allowedPorts := make([]string, len(ports)) - for ix := range ports { - allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port)) - } - if !equalStringSets(allowedPorts, fw.Allowed[0].Ports) { - return true, true, nil - } - // The service controller already verified that the protocol matches on all ports, no need to check. - - actualSourceRanges, err := netsets.ParseIPNets(fw.SourceRanges...) - if err != nil { - // This really shouldn't happen... GCE has returned something unexpected - glog.Warningf("Error parsing firewall SourceRanges: %v", fw.SourceRanges) - // We don't return the error, because we can hopefully recover from this by reconfiguring the firewall - return true, true, nil - } - - if !sourceRanges.Equal(actualSourceRanges) { - return true, true, nil - } - return true, false, nil -} - -func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, region string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error { +func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, svc *v1.Service) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(svc) + scheme := getSvcScheme(svc) clusterID, err := gce.ClusterID.GetID() - if err != nil { - return fmt.Errorf("error getting cluster ID: %v", err) - } - - // Prepare the firewall params for creating / checking. - desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID) - if !isNodesHealthCheck { - desc = makeFirewallDescription(serviceName, ipAddress) - } - sourceRanges := lbSrcRngsFlag.ipn - ports := []v1.ServicePort{{Protocol: "tcp", Port: hcPort}} - - fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck) - fw, err := gce.service.Firewalls.Get(gce.projectID, fwName).Do() - if err != nil { - if !isHTTPErrorCode(err, http.StatusNotFound) { - return fmt.Errorf("error getting firewall for health checks: %v", err) - } - glog.Infof("Creating firewall %v for health checks.", fwName) - if err := gce.createFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil { - return err - } - glog.Infof("Created firewall %v for health checks.", fwName) - return nil - } - // Validate firewall fields. - if fw.Description != desc || - len(fw.Allowed) != 1 || - fw.Allowed[0].IPProtocol != string(ports[0].Protocol) || - !equalStringSets(fw.Allowed[0].Ports, []string{string(ports[0].Port)}) || - !equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) { - glog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName) - if err := gce.updateFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil { - glog.Warningf("Failed to reconcile firewall %v parameters.", fwName) - return err - } - glog.V(4).Infof("Corrected firewall %v parameters successful", fwName) - } - return nil -} - -func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []v1.ServicePort) error { - portRange, err := loadBalancerPortRange(ports) if err != nil { return err } - req := &compute.ForwardingRule{ - Name: name, - Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), - IPAddress: ipAddress, - IPProtocol: string(ports[0].Protocol), - PortRange: portRange, - Target: gce.targetPoolURL(name, region), - } - mc := newForwardingRuleMetricContext("create", region) - op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return mc.Observe(err) + glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v): deleting loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, gce.region) + + switch scheme { + case schemeInternal: + err = gce.ensureInternalLoadBalancerDeleted(clusterName, clusterID, svc) + default: + err = gce.ensureExternalLoadBalancerDeleted(clusterName, svc) } - if op != nil { - err = gce.waitForRegionOp(op, region, mc) - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return err - } - } - return nil + glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v): done deleting loadbalancer. err: %v", clusterName, svc.Namespace, svc.Name, loadBalancerName, gce.region, err) + return err } -func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error { - mc := newFirewallMetricContext("create") - firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) - if err != nil { - return mc.Observe(err) +func getSvcScheme(svc *v1.Service) lbScheme { + if typ, ok := GetLoadBalancerAnnotationType(svc); ok && typ == LBTypeInternal { + return schemeInternal } - op, err := gce.service.Firewalls.Insert(gce.projectID, firewall).Do() - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return mc.Observe(err) - } - if op != nil { - err = gce.waitForGlobalOp(op, mc) - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return err - } - } - return nil -} - -func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error { - mc := newFirewallMetricContext("update") - firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) - if err != nil { - return mc.Observe(err) - } - op, err := gce.service.Firewalls.Update(gce.projectID, name, firewall).Do() - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return mc.Observe(err) - } - if op != nil { - err = gce.waitForGlobalOp(op, mc) - if err != nil { - return err - } - } - return nil -} - -func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { - allowedPorts := make([]string, len(ports)) - for ix := range ports { - allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port)) - } - // If the node tags to be used for this cluster have been predefined in the - // provider config, just use them. Otherwise, invoke computeHostTags method to get the tags. - hostTags := gce.nodeTags - if len(hostTags) == 0 { - var err error - if hostTags, err = gce.computeHostTags(hosts); err != nil { - return nil, fmt.Errorf("No node tags supplied and also failed to parse the given lists of hosts for tags. Abort creating firewall rule.") - } - } - - firewall := &compute.Firewall{ - Name: name, - Description: desc, - Network: gce.networkURL, - SourceRanges: sourceRanges.StringSlice(), - TargetTags: hostTags, - Allowed: []*compute.FirewallAllowed{ - { - // TODO: Make this more generic. Currently this method is only - // used to create firewall rules for loadbalancers, which have - // exactly one protocol, so we can never end up with a list of - // mixed TCP and UDP ports. It should be possible to use a - // single firewall rule for both a TCP and UDP lb. - IPProtocol: strings.ToLower(string(ports[0].Protocol)), - Ports: allowedPorts, - }, - }, - } - return firewall, nil -} - -func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) { - pageToken := "" - page := 0 - for ; page == 0 || (pageToken != "" && page < maxPages); page++ { - listCall := gce.service.Addresses.List(gce.projectID, region) - if pageToken != "" { - listCall = listCall.PageToken(pageToken) - } - addresses, err := listCall.Do() - if err != nil { - return false, fmt.Errorf("failed to list gce IP addresses: %v", err) - } - pageToken = addresses.NextPageToken - for _, addr := range addresses.Items { - if addr.Address == ipAddress { - // This project does own the address, so return success. - return true, nil - } - } - } - if page >= maxPages { - glog.Errorf("projectOwnsStaticIP exceeded maxPages=%d for Addresses.List; truncating.", maxPages) - } - return false, nil -} - -func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string) (ipAddress string, created bool, err error) { - // If the address doesn't exist, this will create it. - // If the existingIP exists but is ephemeral, this will promote it to static. - // If the address already exists, this will harmlessly return a StatusConflict - // and we'll grab the IP before returning. - existed := false - addressObj := &compute.Address{ - Name: name, - Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), - } - - if existingIP != "" { - addressObj.Address = existingIP - } - - mc := newAddressMetricContext("create", region) - op, err := gce.service.Addresses.Insert(gce.projectID, region, addressObj).Do() - if err != nil { - if !isHTTPErrorCode(err, http.StatusConflict) { - return "", false, fmt.Errorf("error creating gce static IP address: %v", - mc.Observe(err)) - } - // StatusConflict == the IP exists already. - existed = true - } - if op != nil { - err := gce.waitForRegionOp(op, region, mc) - if err != nil { - if !isHTTPErrorCode(err, http.StatusConflict) { - return "", false, fmt.Errorf("error waiting for gce static IP address to be created: %v", err) - } - // StatusConflict == the IP exists already. - existed = true - } - } - - // We have to get the address to know which IP was allocated for us. - address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do() - if err != nil { - return "", false, fmt.Errorf("error re-getting gce static IP address: %v", err) - } - return address.Address, existed, nil -} - -func (gce *GCECloud) deleteFirewall(name, region string) error { - mc := newFirewallMetricContext("delete") - op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do() - - if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.V(2).Infof("Firewall %s already deleted. Continuing to delete other resources.", name) - } else if err != nil { - glog.Warningf("Failed to delete firewall %s, got error %v", name, err) - return mc.Observe(err) - } else { - if err := gce.waitForGlobalOp(op, mc); err != nil { - glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", name, err) - return err - } - } - return nil -} - -func (gce *GCECloud) deleteStaticIP(name, region string) error { - mc := newAddressMetricContext("delete", region) - op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do() - if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Static IP address %s is not reserved", name) - } else if err != nil { - glog.Warningf("Failed to delete static IP address %s, got error %v", name, err) - return mc.Observe(err) - } else { - if err := gce.waitForRegionOp(op, region, mc); err != nil { - glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err) - return err - } - } - return nil + return schemeExternal } diff --git a/pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go b/pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go new file mode 100644 index 00000000000..74aefee65ae --- /dev/null +++ b/pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go @@ -0,0 +1,952 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import ( + "fmt" + "net/http" + "strconv" + "strings" + + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/api/v1" + apiservice "k8s.io/kubernetes/pkg/api/v1/service" + "k8s.io/kubernetes/pkg/cloudprovider" + netsets "k8s.io/kubernetes/pkg/util/net/sets" + + "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" +) + +// ensureExternalLoadBalancer is the external implementation of LoadBalancer.EnsureLoadBalancer. +// Our load balancers in GCE consist of four separate GCE resources - a static +// IP address, a firewall rule, a target pool, and a forwarding rule. This +// function has to manage all of them. +// +// Due to an interesting series of design decisions, this handles both creating +// new load balancers and updating existing load balancers, recognizing when +// each is needed. +func (gce *GCECloud) ensureExternalLoadBalancer(clusterName string, apiService *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { + if len(nodes) == 0 { + return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts") + } + + hostNames := nodeNames(nodes) + supportsNodesHealthCheck := supportsNodesHealthCheck(nodes) + hosts, err := gce.getInstancesByNames(hostNames) + if err != nil { + return nil, err + } + + loadBalancerName := cloudprovider.GetLoadBalancerName(apiService) + loadBalancerIP := apiService.Spec.LoadBalancerIP + ports := apiService.Spec.Ports + portStr := []string{} + for _, p := range apiService.Spec.Ports { + portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port)) + } + + affinityType := apiService.Spec.SessionAffinity + + serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name} + glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", + loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations) + + // Check if the forwarding rule exists, and if so, what its IP is. + fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports) + if err != nil { + return nil, err + } + if !fwdRuleExists { + glog.V(2).Infof("Forwarding rule %v for Service %v/%v doesn't exist", + loadBalancerName, apiService.Namespace, apiService.Name) + } + + // Make sure we know which IP address will be used and have properly reserved + // it as static before moving forward with the rest of our operations. + // + // We use static IP addresses when updating a load balancer to ensure that we + // can replace the load balancer's other components without changing the + // address its service is reachable on. We do it this way rather than always + // keeping the static IP around even though this is more complicated because + // it makes it less likely that we'll run into quota issues. Only 7 static + // IP addresses are allowed per region by default. + // + // We could let an IP be allocated for us when the forwarding rule is created, + // but we need the IP to set up the firewall rule, and we want to keep the + // forwarding rule creation as the last thing that needs to be done in this + // function in order to maintain the invariant that "if the forwarding rule + // exists, the LB has been fully created". + ipAddress := "" + + // Through this process we try to keep track of whether it is safe to + // release the IP that was allocated. If the user specifically asked for + // an IP, we assume they are managing it themselves. Otherwise, we will + // release the IP in case of early-terminating failure or upon successful + // creating of the LB. + // TODO(#36535): boil this logic down into a set of component functions + // and key the flag values off of errors returned. + isUserOwnedIP := false // if this is set, we never release the IP + isSafeToReleaseIP := false + defer func() { + if isUserOwnedIP { + return + } + if isSafeToReleaseIP { + if err := gce.DeleteRegionAddress(loadBalancerName, gce.region); err != nil && !isNotFound(err) { + glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err) + } else if isNotFound(err) { + glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): address %s is not reserved.", loadBalancerName, serviceName, ipAddress) + } else { + glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress) + } + } else { + glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err) + } + }() + + if loadBalancerIP != "" { + // If a specific IP address has been requested, we have to respect the + // user's request and use that IP. If the forwarding rule was already using + // a different IP, it will be harmlessly abandoned because it was only an + // ephemeral IP (or it was a different static IP owned by the user, in which + // case we shouldn't delete it anyway). + if isStatic, err := gce.projectOwnsStaticIP(loadBalancerName, gce.region, loadBalancerIP); err != nil { + return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", loadBalancerIP, err) + } else if isStatic { + // The requested IP is a static IP, owned and managed by the user. + isUserOwnedIP = true + isSafeToReleaseIP = false + ipAddress = loadBalancerIP + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", loadBalancerName, serviceName, ipAddress) + } else if loadBalancerIP == fwdRuleIP { + // The requested IP is not a static IP, but is currently assigned + // to this forwarding rule, so we can keep it. + isUserOwnedIP = false + isSafeToReleaseIP = true + ipAddress, _, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP) + if err != nil { + return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) + } + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", loadBalancerName, serviceName, ipAddress) + } else { + // The requested IP is not static and it is not assigned to the + // current forwarding rule. It might be attached to a different + // rule or it might not be part of this project at all. Either + // way, we can't use it. + return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", loadBalancerIP, loadBalancerName, serviceName, err) + } + } else { + // The user did not request a specific IP. + isUserOwnedIP = false + + // This will either allocate a new static IP if the forwarding rule didn't + // already have an IP, or it will promote the forwarding rule's current + // IP from ephemeral to static, or it will just get the IP if it is + // already static. + existed := false + ipAddress, existed, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP) + if err != nil { + return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) + } + if existed { + // If the IP was not specifically requested by the user, but it + // already existed, it seems to be a failed update cycle. We can + // use this IP and try to run through the process again, but we + // should not release the IP unless it is explicitly flagged as OK. + isSafeToReleaseIP = false + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", loadBalancerName, serviceName, ipAddress) + } else { + // For total clarity. The IP did not pre-exist and the user did + // not ask for a particular one, so we can release the IP in case + // of failure or success. + isSafeToReleaseIP = true + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", loadBalancerName, serviceName, ipAddress) + } + } + + // Deal with the firewall next. The reason we do this here rather than last + // is because the forwarding rule is used as the indicator that the load + // balancer is fully created - it's what getLoadBalancer checks for. + // Check if user specified the allow source range + sourceRanges, err := apiservice.GetLoadBalancerSourceRanges(apiService) + if err != nil { + return nil, err + } + + firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports, sourceRanges) + if err != nil { + return nil, err + } + + if firewallNeedsUpdate { + desc := makeFirewallDescription(serviceName.String(), ipAddress) + // Unlike forwarding rules and target pools, firewalls can be updated + // without needing to be deleted and recreated. + if firewallExists { + glog.Infof("EnsureLoadBalancer(%v(%v)): updating firewall", loadBalancerName, serviceName) + if err := gce.updateFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil { + return nil, err + } + glog.Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName) + } else { + glog.Infof("EnsureLoadBalancer(%v(%v)): creating firewall", loadBalancerName, serviceName) + if err := gce.createFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil { + return nil, err + } + glog.Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName) + } + } + + tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(loadBalancerName, gce.region, affinityType) + if err != nil { + return nil, err + } + if !tpExists { + glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name) + } + + clusterID, err := gce.ClusterID.GetID() + if err != nil { + return nil, fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err) + } + // Check which health check needs to create and which health check needs to delete. + // Health check management is coupled with target pool operation to prevent leaking. + var hcToCreate, hcToDelete *compute.HttpHealthCheck + hcLocalTrafficExisting, err := gce.GetHttpHealthCheck(loadBalancerName) + if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { + return nil, fmt.Errorf("error checking HTTP health check %s: %v", loadBalancerName, err) + } + if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" { + glog.V(4).Infof("service %v (%v) needs local traffic health checks on: %d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path) + if hcLocalTrafficExisting == nil { + // This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service + // turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the + // target pool to use local traffic health check. + glog.V(2).Infof("Updating from nodes health checks to local traffic health checks for service %v LB %v", apiService.Name, loadBalancerName) + if supportsNodesHealthCheck { + hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) + } + tpNeedsUpdate = true + } + hcToCreate = makeHttpHealthCheck(loadBalancerName, path, healthCheckNodePort) + } else { + glog.V(4).Infof("Service %v needs nodes health checks.", apiService.Name) + if hcLocalTrafficExisting != nil { + // This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service + // and turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the + // target pool to use nodes health check. + glog.V(2).Infof("Updating from local traffic health checks to nodes health checks for service %v LB %v", apiService.Name, loadBalancerName) + hcToDelete = hcLocalTrafficExisting + tpNeedsUpdate = true + } + if supportsNodesHealthCheck { + hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) + } + } + // Now we get to some slightly more interesting logic. + // First, neither target pools nor forwarding rules can be updated in place - + // they have to be deleted and recreated. + // Second, forwarding rules are layered on top of target pools in that you + // can't delete a target pool that's currently in use by a forwarding rule. + // Thus, we have to tear down the forwarding rule if either it or the target + // pool needs to be updated. + if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsUpdate) { + // Begin critical section. If we have to delete the forwarding rule, + // and something should fail before we recreate it, don't release the + // IP. That way we can come back to it later. + isSafeToReleaseIP = false + if err := gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) { + return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err) + } + glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName) + } + if tpExists && tpNeedsUpdate { + // Pass healthchecks to DeleteExternalTargetPoolAndChecks to cleanup health checks after cleaning up the target pool itself. + var hcNames []string + if hcToDelete != nil { + hcNames = append(hcNames, hcToDelete.Name) + } + if err := gce.DeleteExternalTargetPoolAndChecks(loadBalancerName, gce.region, hcNames...); err != nil { + return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err) + } + glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName) + } + + // Once we've deleted the resources (if necessary), build them back up (or for + // the first time if they're new). + if tpNeedsUpdate { + createInstances := hosts + if len(hosts) > maxTargetPoolCreateInstances { + createInstances = createInstances[:maxTargetPoolCreateInstances] + } + // Pass healthchecks to createTargetPool which needs them as health check links in the target pool + if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddress, gce.region, createInstances, affinityType, hcToCreate); err != nil { + return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err) + } + if hcToCreate != nil { + glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks %v for target pool", loadBalancerName, serviceName, hcToCreate.Name) + } + if len(hosts) <= maxTargetPoolCreateInstances { + glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName) + } else { + glog.Infof("EnsureLoadBalancer(%v(%v)): created initial target pool (now updating with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances) + + created := sets.NewString() + for _, host := range createInstances { + created.Insert(host.makeComparableHostPath()) + } + if err := gce.updateTargetPool(loadBalancerName, created, hosts); err != nil { + return nil, fmt.Errorf("failed to update target pool %s: %v", loadBalancerName, err) + } + glog.Infof("EnsureLoadBalancer(%v(%v)): updated target pool (with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances) + } + } + if tpNeedsUpdate || fwdRuleNeedsUpdate { + glog.Infof("EnsureLoadBalancer(%v(%v)): creating forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress) + if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil { + return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err) + } + // End critical section. It is safe to release the static IP (which + // just demotes it to ephemeral) now that it is attached. In the case + // of a user-requested IP, the "is user-owned" flag will be set, + // preventing it from actually being released. + isSafeToReleaseIP = true + glog.Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress) + } + + status := &v1.LoadBalancerStatus{} + status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddress}} + + return status, nil +} + +// updateExternalLoadBalancer is the external implementation of LoadBalancer.UpdateLoadBalancer. +func (gce *GCECloud) updateExternalLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { + hosts, err := gce.getInstancesByNames(nodeNames(nodes)) + if err != nil { + return err + } + + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do() + if err != nil { + return err + } + existing := sets.NewString() + for _, instance := range pool.Instances { + existing.Insert(hostURLToComparablePath(instance)) + } + + return gce.updateTargetPool(loadBalancerName, existing, hosts) +} + +// ensureExternalLoadBalancerDeleted is the external implementation of LoadBalancer.EnsureLoadBalancerDeleted +func (gce *GCECloud) ensureExternalLoadBalancerDeleted(clusterName string, service *v1.Service) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + + var hcNames []string + if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" { + hcToDelete, err := gce.GetHttpHealthCheck(loadBalancerName) + if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err) + return err + } + hcNames = append(hcNames, hcToDelete.Name) + } else { + clusterID, err := gce.ClusterID.GetID() + if err != nil { + return fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err) + } + // EnsureLoadBalancerDeleted() could be triggered by changing service from + // LoadBalancer type to others. In this case we have no idea whether it was + // using local traffic health check or nodes health check. Attempt to delete + // both to prevent leaking. + hcNames = append(hcNames, loadBalancerName) + hcNames = append(hcNames, makeNodesHealthCheckName(clusterID)) + } + + errs := utilerrors.AggregateGoroutines( + func() error { return ignoreNotFound(gce.DeleteFirewall(makeFirewallName(loadBalancerName))) }, + // Even though we don't hold on to static IPs for load balancers, it's + // possible that EnsureLoadBalancer left one around in a failed + // creation/update attempt, so make sure we clean it up here just in case. + func() error { return ignoreNotFound(gce.DeleteRegionAddress(loadBalancerName, gce.region)) }, + func() error { + // The forwarding rule must be deleted before either the target pool can, + // unfortunately, so we have to do these two serially. + if err := ignoreNotFound(gce.DeleteRegionForwardingRule(loadBalancerName, gce.region)); err != nil { + return err + } + if err := gce.DeleteExternalTargetPoolAndChecks(loadBalancerName, gce.region, hcNames...); err != nil { + return err + } + return nil + }, + ) + if errs != nil { + return utilerrors.Flatten(errs) + } + return nil +} + +func (gce *GCECloud) DeleteExternalTargetPoolAndChecks(name, region string, hcNames ...string) error { + if err := gce.DeleteTargetPool(name, region); err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name) + } else if err != nil { + glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error()) + return err + } + + // Deletion of health checks is allowed only after the TargetPool reference is deleted + for _, hcName := range hcNames { + if err := func() error { + // Check whether it is nodes health check, which has different name from the load-balancer. + isNodesHealthCheck := hcName != name + if isNodesHealthCheck { + // Lock to prevent deleting necessary nodes health check before it gets attached + // to target pool. + gce.sharedResourceLock.Lock() + defer gce.sharedResourceLock.Unlock() + } + glog.Infof("Deleting health check %v", hcName) + if err := gce.DeleteHttpHealthCheck(hcName); err != nil { + // Delete nodes health checks will fail if any other target pool is using it. + if isInUsedByError(err) { + glog.V(4).Infof("Health check %v is in used: %v.", hcName, err) + return nil + } else if !isHTTPErrorCode(err, http.StatusNotFound) { + glog.Warningf("Failed to delete health check %v: %v", hcName, err) + return err + } + // StatusNotFound could happen when: + // - This is the first attempt but we pass in a healthcheck that is already deleted + // to prevent leaking. + // - This is the first attempt but user manually deleted the heathcheck. + // - This is a retry and in previous round we failed to delete the healthcheck firewall + // after deleted the healthcheck. + // We continue to delete the healthcheck firewall to prevent leaking. + glog.V(4).Infof("Health check %v is already deleted.", hcName) + } + clusterID, err := gce.ClusterID.GetID() + if err != nil { + return fmt.Errorf("error getting cluster ID: %v", err) + } + // If health check is deleted without error, it means no load-balancer is using it. + // So we should delete the health check firewall as well. + fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck) + glog.Infof("Deleting firewall %v.", fwName) + if err := gce.DeleteFirewall(fwName); err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + glog.V(4).Infof("Firewall %v is already deleted.", fwName) + return nil + } + return err + } + return nil + }(); err != nil { + return err + } + } + + return nil +} + +func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error { + // health check management is coupled with targetPools to prevent leaks. A + // target pool is the only thing that requires a health check, so we delete + // associated checks on teardown, and ensure checks on setup. + hcLinks := []string{} + if hc != nil { + // Check whether it is nodes health check, which has different name from the load-balancer. + isNodesHealthCheck := hc.Name != name + if isNodesHealthCheck { + // Lock to prevent necessary nodes health check / firewall gets deleted. + gce.sharedResourceLock.Lock() + defer gce.sharedResourceLock.Unlock() + } + if !gce.OnXPN() { + if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, region, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil { + return err + } + } + var err error + if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil { + return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hc.Port, hc.RequestPath, err) + } + hcLinks = append(hcLinks, hc.SelfLink) + } + + var instances []string + for _, host := range hosts { + instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name)) + } + glog.Infof("Creating targetpool %v with %d healthchecks", name, len(hcLinks)) + pool := &compute.TargetPool{ + Name: name, + Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), + Instances: instances, + SessionAffinity: translateAffinityType(affinityType), + HealthChecks: hcLinks, + } + + if _, err := gce.CreateTargetPool(pool, region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + return nil +} + +func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.String, hosts []*gceInstance) error { + var toAdd []*compute.InstanceReference + var toRemove []*compute.InstanceReference + for _, host := range hosts { + link := host.makeComparableHostPath() + if !existing.Has(link) { + toAdd = append(toAdd, &compute.InstanceReference{Instance: link}) + } + existing.Delete(link) + } + for link := range existing { + toRemove = append(toRemove, &compute.InstanceReference{Instance: link}) + } + + if len(toAdd) > 0 { + if err := gce.AddInstancesToTargetPool(loadBalancerName, gce.region, toAdd); err != nil { + return err + } + } + + if len(toRemove) > 0 { + if err := gce.RemoveInstancesFromTargetPool(loadBalancerName, gce.region, toRemove); err != nil { + return err + } + } + + // Try to verify that the correct number of nodes are now in the target pool. + // We've been bitten by a bug here before (#11327) where all nodes were + // accidentally removed and want to make similar problems easier to notice. + updatedPool, err := gce.GetTargetPool(loadBalancerName, gce.region) + if err != nil { + return err + } + if len(updatedPool.Instances) != len(hosts) { + glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s", + len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ",")) + return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts)) + } + return nil +} + +func (gce *GCECloud) targetPoolURL(name, region string) string { + return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) +} + +func makeHttpHealthCheck(name, path string, port int32) *compute.HttpHealthCheck { + return &compute.HttpHealthCheck{ + Name: name, + Port: int64(port), + RequestPath: path, + Host: "", + Description: makeHealthCheckDescription(name), + CheckIntervalSec: gceHcCheckIntervalSeconds, + TimeoutSec: gceHcTimeoutSeconds, + HealthyThreshold: gceHcHealthyThreshold, + UnhealthyThreshold: gceHcUnhealthyThreshold, + } +} + +func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) { + newHC := makeHttpHealthCheck(name, path, port) + hc, err = gce.GetHttpHealthCheck(name) + if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path) + if err = gce.CreateHttpHealthCheck(newHC); err != nil { + return nil, err + } + hc, err = gce.GetHttpHealthCheck(name) + if err != nil { + glog.Errorf("Failed to get http health check %v", err) + return nil, err + } + glog.Infof("Created HTTP health check %v healthCheckNodePort: %d", name, port) + return hc, nil + } + // Validate health check fields + glog.V(4).Infof("Checking http health check params %s", name) + drift := hc.Port != int64(port) || hc.RequestPath != path || hc.Description != makeHealthCheckDescription(name) + drift = drift || hc.CheckIntervalSec != gceHcCheckIntervalSeconds || hc.TimeoutSec != gceHcTimeoutSeconds + drift = drift || hc.UnhealthyThreshold != gceHcUnhealthyThreshold || hc.HealthyThreshold != gceHcHealthyThreshold + if drift { + glog.Warningf("Health check %v exists but parameters have drifted - updating...", name) + if err := gce.UpdateHttpHealthCheck(newHC); err != nil { + glog.Warningf("Failed to reconcile http health check %v parameters", name) + return nil, err + } + glog.V(4).Infof("Corrected health check %v parameters successful", name) + } + return hc, nil +} + +// Passing nil for requested IP is perfectly fine - it just means that no specific +// IP is being requested. +// Returns whether the forwarding rule exists, whether it needs to be updated, +// what its IP address is (if it exists), and any error we encountered. +func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []v1.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) { + fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() + if err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + return false, true, "", nil + } + // Err on the side of caution in case of errors. Caller should notice the error and retry. + // We never want to end up recreating resources because gce api flaked. + return true, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err) + } + // If the user asks for a specific static ip through the Service spec, + // check that we're actually using it. + // TODO: we report loadbalancer IP through status, so we want to verify if + // that matches the forwarding rule as well. + if loadBalancerIP != "" && loadBalancerIP != fwd.IPAddress { + glog.Infof("LoadBalancer ip for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPAddress, loadBalancerIP) + return true, true, fwd.IPAddress, nil + } + portRange, err := loadBalancerPortRange(ports) + if err != nil { + // Err on the side of caution in case of errors. Caller should notice the error and retry. + // We never want to end up recreating resources because gce api flaked. + return true, false, "", err + } + if portRange != fwd.PortRange { + glog.Infof("LoadBalancer port range for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.PortRange, portRange) + return true, true, fwd.IPAddress, nil + } + // The service controller verified all the protocols match on the ports, just check the first one + if string(ports[0].Protocol) != fwd.IPProtocol { + glog.Infof("LoadBalancer protocol for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPProtocol, string(ports[0].Protocol)) + return true, true, fwd.IPAddress, nil + } + + return true, false, fwd.IPAddress, nil +} + +// Doesn't check whether the hosts have changed, since host updating is handled +// separately. +func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType v1.ServiceAffinity) (exists bool, needsUpdate bool, err error) { + tp, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() + if err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + return false, true, nil + } + // Err on the side of caution in case of errors. Caller should notice the error and retry. + // We never want to end up recreating resources because gce api flaked. + return true, false, fmt.Errorf("error getting load balancer's target pool: %v", err) + } + // TODO: If the user modifies their Service's session affinity, it *should* + // reflect in the associated target pool. However, currently not setting the + // session affinity on a target pool defaults it to the empty string while + // not setting in on a Service defaults it to None. There is a lack of + // documentation around the default setting for the target pool, so if we + // find it's the undocumented empty string, don't blindly recreate the + // target pool (which results in downtime). Fix this when we have formally + // defined the defaults on either side. + if tp.SessionAffinity != "" && translateAffinityType(affinityType) != tp.SessionAffinity { + glog.Infof("LoadBalancer target pool %v changed affinity from %v to %v", name, tp.SessionAffinity, affinityType) + return true, true, nil + } + return true, false, nil +} + +func (h *gceInstance) makeComparableHostPath() string { + return fmt.Sprintf("/zones/%s/instances/%s", h.Zone, h.Name) +} + +func nodeNames(nodes []*v1.Node) []string { + ret := make([]string, len(nodes)) + for i, node := range nodes { + ret[i] = node.Name + } + return ret +} + +func makeHostURL(projectID, zone, host string) string { + host = canonicalizeInstanceName(host) + return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/instances/%s", + projectID, zone, host) +} + +func hostURLToComparablePath(hostURL string) string { + idx := strings.Index(hostURL, "/zones/") + if idx < 0 { + return "" + } + return hostURL[idx:] +} + +func loadBalancerPortRange(ports []v1.ServicePort) (string, error) { + if len(ports) == 0 { + return "", fmt.Errorf("no ports specified for GCE load balancer") + } + + // The service controller verified all the protocols match on the ports, just check and use the first one + if ports[0].Protocol != v1.ProtocolTCP && ports[0].Protocol != v1.ProtocolUDP { + return "", fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(ports[0].Protocol)) + } + + minPort := int32(65536) + maxPort := int32(0) + for i := range ports { + if ports[i].Port < minPort { + minPort = ports[i].Port + } + if ports[i].Port > maxPort { + maxPort = ports[i].Port + } + } + return fmt.Sprintf("%d-%d", minPort, maxPort), nil +} + +// translate from what K8s supports to what the cloud provider supports for session affinity. +func translateAffinityType(affinityType v1.ServiceAffinity) string { + switch affinityType { + case v1.ServiceAffinityClientIP: + return gceAffinityTypeClientIP + case v1.ServiceAffinityNone: + return gceAffinityTypeNone + default: + glog.Errorf("Unexpected affinity type: %v", affinityType) + return gceAffinityTypeNone + } +} + +func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []v1.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) { + if gce.OnXPN() { + glog.V(2).Infoln("firewallNeedsUpdate: Cluster is on XPN network - skipping firewall creation") + return false, false, nil + } + + fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do() + if err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + return false, true, nil + } + return false, false, fmt.Errorf("error getting load balancer's firewall: %v", err) + } + if fw.Description != makeFirewallDescription(serviceName, ipAddress) { + return true, true, nil + } + if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") { + return true, true, nil + } + // Make sure the allowed ports match. + allowedPorts := make([]string, len(ports)) + for ix := range ports { + allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port)) + } + if !equalStringSets(allowedPorts, fw.Allowed[0].Ports) { + return true, true, nil + } + // The service controller already verified that the protocol matches on all ports, no need to check. + + actualSourceRanges, err := netsets.ParseIPNets(fw.SourceRanges...) + if err != nil { + // This really shouldn't happen... GCE has returned something unexpected + glog.Warningf("Error parsing firewall SourceRanges: %v", fw.SourceRanges) + // We don't return the error, because we can hopefully recover from this by reconfiguring the firewall + return true, true, nil + } + + if !sourceRanges.Equal(actualSourceRanges) { + return true, true, nil + } + return true, false, nil +} + +func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, region string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error { + clusterID, err := gce.ClusterID.GetID() + if err != nil { + return fmt.Errorf("error getting cluster ID: %v", err) + } + + // Prepare the firewall params for creating / checking. + desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID) + if !isNodesHealthCheck { + desc = makeFirewallDescription(serviceName, ipAddress) + } + sourceRanges := lbSrcRngsFlag.ipn + ports := []v1.ServicePort{{Protocol: "tcp", Port: hcPort}} + + fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck) + fw, err := gce.service.Firewalls.Get(gce.projectID, fwName).Do() + if err != nil { + if !isHTTPErrorCode(err, http.StatusNotFound) { + return fmt.Errorf("error getting firewall for health checks: %v", err) + } + glog.Infof("Creating firewall %v for health checks.", fwName) + if err := gce.createFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil { + return err + } + glog.Infof("Created firewall %v for health checks.", fwName) + return nil + } + // Validate firewall fields. + if fw.Description != desc || + len(fw.Allowed) != 1 || + fw.Allowed[0].IPProtocol != string(ports[0].Protocol) || + !equalStringSets(fw.Allowed[0].Ports, []string{string(ports[0].Port)}) || + !equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) { + glog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName) + if err := gce.updateFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil { + glog.Warningf("Failed to reconcile firewall %v parameters.", fwName) + return err + } + glog.V(4).Infof("Corrected firewall %v parameters successful", fwName) + } + return nil +} + +func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []v1.ServicePort) error { + portRange, err := loadBalancerPortRange(ports) + if err != nil { + return err + } + req := &compute.ForwardingRule{ + Name: name, + Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), + IPAddress: ipAddress, + IPProtocol: string(ports[0].Protocol), + PortRange: portRange, + Target: gce.targetPoolURL(name, region), + } + + if err = gce.CreateRegionForwardingRule(req, region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + return nil +} + +func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error { + firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) + if err != nil { + return err + } + if err = gce.CreateFirewall(firewall); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + return nil +} + +func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error { + firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) + if err != nil { + return err + } + + if err = gce.UpdateFirewall(firewall); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + return nil +} + +func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { + allowedPorts := make([]string, len(ports)) + for ix := range ports { + allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port)) + } + // If the node tags to be used for this cluster have been predefined in the + // provider config, just use them. Otherwise, invoke computeHostTags method to get the tags. + hostTags := gce.nodeTags + if len(hostTags) == 0 { + var err error + if hostTags, err = gce.computeHostTags(hosts); err != nil { + return nil, fmt.Errorf("No node tags supplied and also failed to parse the given lists of hosts for tags. Abort creating firewall rule.") + } + } + + firewall := &compute.Firewall{ + Name: name, + Description: desc, + Network: gce.networkURL, + SourceRanges: sourceRanges.StringSlice(), + TargetTags: hostTags, + Allowed: []*compute.FirewallAllowed{ + { + // TODO: Make this more generic. Currently this method is only + // used to create firewall rules for loadbalancers, which have + // exactly one protocol, so we can never end up with a list of + // mixed TCP and UDP ports. It should be possible to use a + // single firewall rule for both a TCP and UDP lb. + IPProtocol: strings.ToLower(string(ports[0].Protocol)), + Ports: allowedPorts, + }, + }, + } + return firewall, nil +} + +func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) { + pageToken := "" + page := 0 + for ; page == 0 || (pageToken != "" && page < maxPages); page++ { + listCall := gce.service.Addresses.List(gce.projectID, region) + if pageToken != "" { + listCall = listCall.PageToken(pageToken) + } + addresses, err := listCall.Do() + if err != nil { + return false, fmt.Errorf("failed to list gce IP addresses: %v", err) + } + pageToken = addresses.NextPageToken + for _, addr := range addresses.Items { + if addr.Address == ipAddress { + // This project does own the address, so return success. + return true, nil + } + } + } + if page >= maxPages { + glog.Errorf("projectOwnsStaticIP exceeded maxPages=%d for Addresses.List; truncating.", maxPages) + } + return false, nil +} + +func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string) (ipAddress string, created bool, err error) { + // If the address doesn't exist, this will create it. + // If the existingIP exists but is ephemeral, this will promote it to static. + // If the address already exists, this will harmlessly return a StatusConflict + // and we'll grab the IP before returning. + existed := false + addressObj := &compute.Address{ + Name: name, + Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), + } + + if existingIP != "" { + addressObj.Address = existingIP + } + + address, err := gce.ReserveRegionAddress(addressObj, region) + if err != nil { + if !isHTTPErrorCode(err, http.StatusConflict) { + return "", false, fmt.Errorf("error creating gce static IP address: %v", err) + } + // StatusConflict == the IP exists already. + existed = true + } + + return address.Address, existed, nil +} diff --git a/pkg/cloudprovider/providers/gce/gce_loadbalancer_internal.go b/pkg/cloudprovider/providers/gce/gce_loadbalancer_internal.go new file mode 100644 index 00000000000..47b7ed90ee8 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/gce_loadbalancer_internal.go @@ -0,0 +1,636 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import ( + "fmt" + "strconv" + "strings" + + "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/api/v1" + v1_service "k8s.io/kubernetes/pkg/api/v1/service" + "k8s.io/kubernetes/pkg/cloudprovider" +) + +const ( + allInstances = "ALL" +) + +type lbBalancingMode string + +func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { + nm := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + ports, protocol := getPortsAndProtocol(svc.Spec.Ports) + scheme := schemeInternal + loadBalancerName := cloudprovider.GetLoadBalancerName(svc) + shared := !v1_service.RequestsOnlyLocalTraffic(svc) + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shared, scheme, protocol, svc.Spec.SessionAffinity) + backendServiceLink := gce.getBackendServiceLink(backendServiceName) + + // Ensure instance groups exist and nodes are assigned to groups + igName := makeInstanceGroupName(clusterID) + igLinks, err := gce.ensureInternalInstanceGroups(igName, nodes) + if err != nil { + return nil, err + } + + // Lock the sharedResourceLock to prevent any deletions of shared resources while assembling shared resources here + gce.sharedResourceLock.Lock() + defer gce.sharedResourceLock.Unlock() + + // Ensure health check for backend service is created + // By default, use the node health check endpoint + hcName := makeHealthCheckName(loadBalancerName, clusterID, shared) + hcPath, hcPort := GetNodesHealthCheckPath(), GetNodesHealthCheckPort() + if !shared { + // Service requires a special health check, retrieve the OnlyLocal port & path + hcPath, hcPort = v1_service.GetServiceHealthCheckPathPort(svc) + } + hc, err := gce.ensureInternalHealthCheck(hcName, nm, shared, hcPath, hcPort) + if err != nil { + return nil, err + } + + // Ensure firewall rules if necessary + if gce.OnXPN() { + glog.V(2).Infof("ensureInternalLoadBalancer: cluster is on a cross-project network (XPN) network project %v, compute project %v - skipping firewall creation", gce.networkProjectID, gce.projectID) + } else { + if err = gce.ensureInternalFirewalls(loadBalancerName, clusterID, nm, svc, strconv.Itoa(int(hcPort)), shared, nodes); err != nil { + return nil, err + } + } + + expectedFwdRule := &compute.ForwardingRule{ + Name: loadBalancerName, + Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, nm.String()), + IPAddress: svc.Spec.LoadBalancerIP, + BackendService: backendServiceLink, + Ports: ports, + IPProtocol: string(protocol), + LoadBalancingScheme: string(scheme), + } + + // Specify subnetwork if network type is manual + if len(gce.subnetworkURL) > 0 { + expectedFwdRule.Subnetwork = gce.subnetworkURL + } else { + expectedFwdRule.Network = gce.networkURL + } + + // Delete the previous internal load balancer if necessary + fwdRuleDeleted, err := gce.clearExistingInternalLB(loadBalancerName, existingFwdRule, expectedFwdRule, backendServiceName) + if err != nil { + return nil, err + } + + bsDescription := makeBackendServiceDescription(nm, shared) + err = gce.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink) + if err != nil { + return nil, err + } + + // If we previously deleted the forwarding rule or it never existed, finally create it. + if fwdRuleDeleted || existingFwdRule == nil { + glog.V(2).Infof("ensureInternalLoadBalancer(%v(%v)): creating forwarding rule", loadBalancerName, svc.Name) + if err = gce.CreateRegionForwardingRule(expectedFwdRule, gce.region); err != nil { + return nil, err + } + } + + // Get the most recent forwarding rule for the new address. + existingFwdRule, err = gce.GetRegionForwardingRule(loadBalancerName, gce.region) + if err != nil { + return nil, err + } + + status := &v1.LoadBalancerStatus{} + status.Ingress = []v1.LoadBalancerIngress{{IP: existingFwdRule.IPAddress}} + return status, nil +} + +func (gce *GCECloud) clearExistingInternalLB(loadBalancerName string, existingFwdRule, expectedFwdRule *compute.ForwardingRule, expectedBSName string) (fwdRuleDeleted bool, err error) { + if existingFwdRule == nil { + return false, nil + } + + if !fwdRuleEqual(existingFwdRule, expectedFwdRule) { + glog.V(2).Infof("clearExistingInternalLB(%v: deleting existing forwarding rule with IP address %v", loadBalancerName, existingFwdRule.IPAddress) + if err = gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) { + return false, err + } + fwdRuleDeleted = true + } + + existingBSName := getNameFromLink(existingFwdRule.BackendService) + bs, err := gce.GetRegionBackendService(existingBSName, gce.region) + if err == nil { + if bs.Name != expectedBSName { + glog.V(2).Infof("clearExistingInternalLB(%v): expected backend service %q does not match actual %q - deleting backend service & healthcheck & firewall", loadBalancerName, expectedBSName, bs.Name) + // Delete the backend service as well in case it's switching between shared, nonshared, tcp, udp. + var existingHCName string + if len(bs.HealthChecks) == 1 { + existingHCName = getNameFromLink(bs.HealthChecks[0]) + } + if err = gce.teardownInternalBackendResources(existingBSName, existingHCName); err != nil { + glog.Warningf("clearExistingInternalLB: could not delete old resources: %v", err) + } else { + glog.V(2).Infof("clearExistingInternalLB: done deleting old resources") + } + } + } else { + glog.Warningf("clearExistingInternalLB(%v): failed to retrieve existing backend service %v", loadBalancerName, existingBSName) + } + return fwdRuleDeleted, nil +} + +func (gce *GCECloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v1.Service, nodes []*v1.Node) error { + gce.sharedResourceLock.Lock() + defer gce.sharedResourceLock.Unlock() + + igName := makeInstanceGroupName(clusterID) + igLinks, err := gce.ensureInternalInstanceGroups(igName, nodes) + if err != nil { + return err + } + + // Generate the backend service name + _, protocol := getPortsAndProtocol(svc.Spec.Ports) + scheme := schemeInternal + loadBalancerName := cloudprovider.GetLoadBalancerName(svc) + shared := !v1_service.RequestsOnlyLocalTraffic(svc) + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shared, scheme, protocol, svc.Spec.SessionAffinity) + // Ensure the backend service has the proper backend instance-group links + return gce.ensureInternalBackendServiceGroups(backendServiceName, igLinks) +} + +func (gce *GCECloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, svc *v1.Service) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(svc) + _, protocol := getPortsAndProtocol(svc.Spec.Ports) + scheme := schemeInternal + shared := !v1_service.RequestsOnlyLocalTraffic(svc) + var err error + + gce.sharedResourceLock.Lock() + defer gce.sharedResourceLock.Unlock() + + glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule", loadBalancerName) + if err = gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) { + return err + } + + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shared, scheme, protocol, svc.Spec.SessionAffinity) + glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName) + if err = gce.DeleteRegionBackendService(backendServiceName, gce.region); err != nil && !isNotFoundOrInUse(err) { + return err + } + + // Only delete the health check & health check firewall if they aren't being used by another LB. If we get + // an ResourceInuseBy error, then we can skip deleting the firewall. + hcInUse := false + hcName := makeHealthCheckName(loadBalancerName, clusterID, shared) + glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting health check %v", loadBalancerName, hcName) + if err = gce.DeleteHealthCheck(hcName); err != nil && !isNotFoundOrInUse(err) { + return err + } else if isInUsedByError(err) { + glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): healthcheck %v still in use", loadBalancerName, hcName) + hcInUse = true + } + + glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting firewall for traffic", loadBalancerName) + if err = gce.DeleteFirewall(loadBalancerName); err != nil { + return err + } + + if hcInUse { + glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): skipping firewall for healthcheck", loadBalancerName) + } else { + glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting firewall for healthcheck", loadBalancerName) + fwHCName := makeHealthCheckFirewallkName(loadBalancerName, clusterID, shared) + if err = gce.DeleteFirewall(fwHCName); err != nil && !isInUsedByError(err) { + return err + } + } + + // Try deleting instance groups - expect ResourceInuse error if needed by other LBs + igName := makeInstanceGroupName(clusterID) + if err = gce.ensureInternalInstanceGroupsDeleted(igName); err != nil && !isInUsedByError(err) { + return err + } + + return nil +} + +func (gce *GCECloud) teardownInternalBackendResources(bsName, hcName string) error { + if err := gce.DeleteRegionBackendService(bsName, gce.region); err != nil { + if isNotFound(err) { + glog.V(2).Infof("backend service already deleted: %v, err: %v", bsName, err) + } else if err != nil && isInUsedByError(err) { + glog.V(2).Infof("backend service in use: %v, err: %v", bsName, err) + } else { + return fmt.Errorf("failed to delete backend service: %v, err: %v", bsName, err) + } + } + + if hcName == "" { + return nil + } + hcInUse := false + if err := gce.DeleteHealthCheck(hcName); err != nil { + if isNotFound(err) { + glog.V(2).Infof("health check already deleted: %v, err: %v", hcName, err) + } else if err != nil && isInUsedByError(err) { + hcInUse = true + glog.V(2).Infof("health check in use: %v, err: %v", hcName, err) + } else { + return fmt.Errorf("failed to delete health check: %v, err: %v", hcName, err) + } + } + + hcFirewallName := makeHealthCheckFirewallkNameFromHC(hcName) + if hcInUse { + glog.V(2).Infof("skipping deletion of health check firewall: %v", hcFirewallName) + return nil + } + + if err := gce.DeleteFirewall(hcFirewallName); err != nil && !isNotFound(err) { + return fmt.Errorf("failed to delete health check firewall: %v, err: %v", hcFirewallName, err) + } + return nil +} + +func (gce *GCECloud) ensureInternalFirewall(fwName, fwDesc string, sourceRanges []string, ports []string, protocol v1.Protocol, nodes []*v1.Node) error { + glog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName) + targetTags, err := gce.GetNodeTags(nodeNames(nodes)) + if err != nil { + return err + } + + existingFirewall, err := gce.GetFirewall(fwName) + if err != nil && !isNotFound(err) { + return err + } + + expectedFirewall := &compute.Firewall{ + Name: fwName, + Description: fwDesc, + Network: gce.networkURL, + SourceRanges: sourceRanges, + TargetTags: targetTags, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: strings.ToLower(string(protocol)), + Ports: ports, + }, + }, + } + + if existingFirewall == nil { + glog.V(2).Infof("ensureInternalFirewall(%v): creating firewall", fwName) + return gce.CreateFirewall(expectedFirewall) + } + + if firewallRuleEqual(expectedFirewall, existingFirewall) { + return nil + } + + glog.V(2).Infof("ensureInternalFirewall(%v): updating firewall", fwName) + return gce.UpdateFirewall(expectedFirewall) +} + +func (gce *GCECloud) ensureInternalFirewalls(loadBalancerName, clusterID string, nm types.NamespacedName, svc *v1.Service, healthCheckPort string, shared bool, nodes []*v1.Node) error { + // First firewall is for ingress traffic + fwDesc := makeFirewallDescription(nm.String(), svc.Spec.LoadBalancerIP) + ports, protocol := getPortsAndProtocol(svc.Spec.Ports) + sourceRanges, err := v1_service.GetLoadBalancerSourceRanges(svc) + if err != nil { + return err + } + err = gce.ensureInternalFirewall(loadBalancerName, fwDesc, sourceRanges.StringSlice(), ports, protocol, nodes) + if err != nil { + return err + } + + // Second firewall is for health checking nodes / services + fwHCName := makeHealthCheckFirewallkName(loadBalancerName, clusterID, shared) + hcSrcRanges := LoadBalancerSrcRanges() + return gce.ensureInternalFirewall(fwHCName, "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes) +} + +func (gce *GCECloud) ensureInternalHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) (*compute.HealthCheck, error) { + glog.V(2).Infof("ensureInternalHealthCheck(%v, %v, %v): checking existing health check", name, path, port) + expectedHC := newInternalLBHealthCheck(name, svcName, shared, path, port) + + hc, err := gce.GetHealthCheck(name) + if err != nil && !isNotFound(err) { + return nil, err + } + + if hc == nil { + glog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path) + if err = gce.CreateHealthCheck(expectedHC); err != nil { + return nil, err + } + hc, err = gce.GetHealthCheck(name) + if err != nil { + glog.Errorf("Failed to get http health check %v", err) + return nil, err + } + glog.V(2).Infof("ensureInternalHealthCheck: created health check %v", name) + return hc, nil + } + + if healthChecksEqual(expectedHC, hc) { + return hc, nil + } + + glog.V(2).Infof("ensureInternalHealthCheck: health check %v exists but parameters have drifted - updating...", name) + if err := gce.UpdateHealthCheck(expectedHC); err != nil { + glog.Warningf("Failed to reconcile http health check %v parameters", name) + return nil, err + } + glog.V(2).Infof("ensureInternalHealthCheck: corrected health check %v parameters successful", name) + return hc, nil +} + +func (gce *GCECloud) ensureInternalInstanceGroup(name, zone string, nodes []*v1.Node) (string, error) { + glog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): checking group that it contains %v nodes", name, zone, len(nodes)) + ig, err := gce.GetInstanceGroup(name, zone) + if err != nil && !isNotFound(err) { + return "", err + } + + kubeNodes := sets.NewString() + for _, n := range nodes { + kubeNodes.Insert(n.Name) + } + + gceNodes := sets.NewString() + if ig == nil { + glog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): creating instance group", name, zone) + ig, err = gce.CreateInstanceGroup(name, zone) + if err != nil { + return "", err + } + } else { + instances, err := gce.ListInstancesInInstanceGroup(name, zone, allInstances) + if err != nil { + return "", err + } + + for _, ins := range instances.Items { + parts := strings.Split(ins.Instance, "/") + gceNodes.Insert(parts[len(parts)-1]) + } + } + + removeNodes := gceNodes.Difference(kubeNodes).List() + addNodes := kubeNodes.Difference(gceNodes).List() + + if len(removeNodes) != 0 { + glog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): removing nodes: %v", name, zone, removeNodes) + instanceRefs := gce.ToInstanceReferences(zone, removeNodes) + // Possible we'll receive 404's here if the instance was deleted before getting to this point. + if err = gce.RemoveInstancesFromInstanceGroup(name, zone, instanceRefs); err != nil && !isNotFound(err) { + return "", err + } + } + + if len(addNodes) != 0 { + glog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): adding nodes: %v", name, zone, addNodes) + instanceRefs := gce.ToInstanceReferences(zone, addNodes) + if err = gce.AddInstancesToInstanceGroup(name, zone, instanceRefs); err != nil { + return "", err + } + } + + return ig.SelfLink, nil +} + +// ensureInternalInstanceGroups generates an unmanaged instance group for every zone +// where a K8s node exists. It also ensures that each node belongs to an instance group +func (gce *GCECloud) ensureInternalInstanceGroups(name string, nodes []*v1.Node) ([]string, error) { + zonedNodes := splitNodesByZone(nodes) + glog.V(2).Infof("ensureInternalInstanceGroups(%v): %d nodes over %d zones in region %v", name, len(nodes), len(zonedNodes), gce.region) + var igLinks []string + for zone, nodes := range zonedNodes { + igLink, err := gce.ensureInternalInstanceGroup(name, zone, nodes) + if err != nil { + return []string{}, err + } + igLinks = append(igLinks, igLink) + } + + return igLinks, nil +} + +func (gce *GCECloud) ensureInternalInstanceGroupsDeleted(name string) error { + // List of nodes isn't available here - fetch all zones in region and try deleting this cluster's ig + zones, err := gce.ListZonesInRegion(gce.region) + if err != nil { + return err + } + + glog.V(2).Infof("ensureInternalInstanceGroupsDeleted(%v): deleting instance group in all %d zones", name, len(zones)) + for _, z := range zones { + if err := gce.DeleteInstanceGroup(name, z.Name); err != nil && !isNotFound(err) { + return err + } + } + return nil +} + +func (gce *GCECloud) ensureInternalBackendService(name, description string, affinityType v1.ServiceAffinity, scheme lbScheme, protocol v1.Protocol, igLinks []string, hcLink string) error { + glog.V(2).Infof("ensureInternalBackendService(%v, %v, %v): checking existing backend service with %d groups", name, scheme, protocol, len(igLinks)) + bs, err := gce.GetRegionBackendService(name, gce.region) + if err != nil && !isNotFound(err) { + return err + } + + backends := backendsFromGroupLinks(igLinks) + expectedBS := &compute.BackendService{ + Name: name, + Protocol: string(protocol), + Description: description, + HealthChecks: []string{hcLink}, + Backends: backends, + SessionAffinity: translateAffinityType(affinityType), + LoadBalancingScheme: string(scheme), + } + + // Create backend service if none was found + if bs == nil { + glog.V(2).Infof("ensureInternalBackendService: creating backend service %v", name) + err := gce.CreateRegionBackendService(expectedBS, gce.region) + if err != nil { + return err + } + glog.V(2).Infof("ensureInternalBackendService: created backend service %v successfully", name) + return nil + } + // Check existing backend service + existingIGLinks := sets.NewString() + for _, be := range bs.Backends { + existingIGLinks.Insert(be.Group) + } + + if backendSvcEqual(expectedBS, bs) { + return nil + } + + glog.V(2).Infof("ensureInternalBackendService: updating backend service %v", name) + if err := gce.UpdateRegionBackendService(expectedBS, gce.region); err != nil { + return err + } + glog.V(2).Infof("ensureInternalBackendService: updated backend service %v successfully", name) + return nil +} + +func (gce *GCECloud) ensureInternalBackendServiceGroups(name string, igLinks []string) error { + glog.V(2).Infof("ensureInternalBackendServiceGroups(%v): checking existing backend service's groups", name) + bs, err := gce.GetRegionBackendService(name, gce.region) + if err != nil { + return err + } + + backends := backendsFromGroupLinks(igLinks) + if backendsListEqual(bs.Backends, backends) { + return nil + } + + glog.V(2).Infof("ensureInternalBackendServiceGroups: updating backend service %v", name) + bs.Backends = backends + if err := gce.UpdateRegionBackendService(bs, gce.region); err != nil { + return err + } + glog.V(2).Infof("ensureInternalBackendServiceGroups: updated backend service %v successfully", name) + return nil +} + +func backendsFromGroupLinks(igLinks []string) []*compute.Backend { + var backends []*compute.Backend + for _, igLink := range igLinks { + backends = append(backends, &compute.Backend{ + Group: igLink, + }) + } + return backends +} + +func newInternalLBHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) *compute.HealthCheck { + httpSettings := compute.HTTPHealthCheck{ + Port: int64(port), + RequestPath: path, + } + desc := "" + if !shared { + desc = makeHealthCheckDescription(svcName.String()) + } + return &compute.HealthCheck{ + Name: name, + CheckIntervalSec: gceHcCheckIntervalSeconds, + TimeoutSec: gceHcTimeoutSeconds, + HealthyThreshold: gceHcHealthyThreshold, + UnhealthyThreshold: gceHcUnhealthyThreshold, + HttpHealthCheck: &httpSettings, + Type: "HTTP", + Description: desc, + } +} + +func firewallRuleEqual(a, b *compute.Firewall) bool { + return a.Description == b.Description && + len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) && + a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol && + equalStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) && + equalStringSets(a.SourceRanges, b.SourceRanges) && + equalStringSets(a.TargetTags, b.TargetTags) +} + +func healthChecksEqual(a, b *compute.HealthCheck) bool { + return a.HttpHealthCheck != nil && b.HttpHealthCheck != nil && + a.HttpHealthCheck.Port == b.HttpHealthCheck.Port && + a.HttpHealthCheck.RequestPath == b.HttpHealthCheck.RequestPath && + a.Description == b.Description && + a.CheckIntervalSec == b.CheckIntervalSec && + a.TimeoutSec == b.TimeoutSec && + a.UnhealthyThreshold == b.UnhealthyThreshold && + a.HealthyThreshold == b.HealthyThreshold +} + +// backendsListEqual asserts that backend lists are equal by instance group link only +func backendsListEqual(a, b []*compute.Backend) bool { + if len(a) != len(b) { + return false + } + if len(a) == 0 { + return true + } + + aSet := sets.NewString() + for _, v := range a { + aSet.Insert(v.Group) + } + bSet := sets.NewString() + for _, v := range b { + bSet.Insert(v.Group) + } + + return aSet.Equal(bSet) +} + +func backendSvcEqual(a, b *compute.BackendService) bool { + return a.Protocol == b.Protocol && + a.Description == b.Description && + a.SessionAffinity == b.SessionAffinity && + a.LoadBalancingScheme == b.LoadBalancingScheme && + equalStringSets(a.HealthChecks, b.HealthChecks) && + backendsListEqual(a.Backends, b.Backends) +} + +func fwdRuleEqual(a, b *compute.ForwardingRule) bool { + return (a.IPAddress == "" || b.IPAddress == "" || a.IPAddress == b.IPAddress) && + a.IPProtocol == b.IPProtocol && + a.LoadBalancingScheme == b.LoadBalancingScheme && + equalStringSets(a.Ports, b.Ports) && + a.BackendService == b.BackendService +} + +func getPortsAndProtocol(svcPorts []v1.ServicePort) (ports []string, protocol v1.Protocol) { + if len(svcPorts) == 0 { + return []string{}, v1.ProtocolUDP + } + + // GCP doesn't support multiple protocols for a single load balancer + protocol = svcPorts[0].Protocol + for _, p := range svcPorts { + ports = append(ports, strconv.Itoa(int(p.Port))) + } + return ports, protocol +} + +func (gce *GCECloud) getBackendServiceLink(name string) string { + return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/backendServices/%s", gce.projectID, gce.region, name) +} + +func getNameFromLink(link string) string { + fields := strings.Split(link, "/") + return fields[len(fields)-1] +} diff --git a/pkg/cloudprovider/providers/gce/gce_loadbalancer_naming.go b/pkg/cloudprovider/providers/gce/gce_loadbalancer_naming.go new file mode 100644 index 00000000000..7279be63ffe --- /dev/null +++ b/pkg/cloudprovider/providers/gce/gce_loadbalancer_naming.go @@ -0,0 +1,103 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import ( + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/api/v1" +) + +// Internal Load Balancer + +// Instance groups remain legacy named to stay consistent with ingress +func makeInstanceGroupName(clusterID string) string { + return fmt.Sprintf("k8s-ig--%s", clusterID) +} + +func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, scheme lbScheme, protocol v1.Protocol, svcAffinity v1.ServiceAffinity) string { + if shared { + affinity := "" + switch svcAffinity { + case v1.ServiceAffinityClientIP: + affinity = "clientip" + default: + affinity = "noaffinity" + } + + return fmt.Sprintf("k8s-%s-%s-%s-%s", clusterID, strings.ToLower(string(scheme)), strings.ToLower(string(protocol)), affinity) + } + return loadBalancerName +} + +func makeHealthCheckName(loadBalancerName, clusterID string, shared bool) string { + if shared { + return fmt.Sprintf("k8s-%s-node", clusterID) + } + + return loadBalancerName +} + +func makeHealthCheckFirewallkNameFromHC(healthCheckName string) string { + return healthCheckName + "-hc" +} + +func makeHealthCheckFirewallkName(loadBalancerName, clusterID string, shared bool) string { + if shared { + return fmt.Sprintf("k8s-%s-node-hc", clusterID) + } + return loadBalancerName + "-hc" +} + +func makeBackendServiceDescription(nm types.NamespacedName, shared bool) string { + if shared { + return "" + } + return fmt.Sprintf(`{"kubernetes.io/service-name":"%s"`, nm.String()) +} + +// External Load Balancer + +// makeNodesHealthCheckName returns name of the health check resource used by +// the GCE load balancers (l4) for performing health checks on nodes. +func makeNodesHealthCheckName(clusterID string) string { + return fmt.Sprintf("k8s-%v-node", clusterID) +} + +func makeHealthCheckDescription(serviceName string) string { + return fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName) +} + +// MakeHealthCheckFirewallName returns the firewall name used by the GCE load +// balancers (l4) for performing health checks. +func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string { + if isNodesHealthCheck { + return makeNodesHealthCheckName(clusterID) + "-http-hc" + } + return "k8s-" + hcName + "-http-hc" +} + +func makeFirewallName(name string) string { + return fmt.Sprintf("k8s-fw-%s", name) +} + +func makeFirewallDescription(serviceName, ipAddress string) string { + return fmt.Sprintf(`{"kubernetes.io/service-name":"%s", "kubernetes.io/service-ip":"%s"}`, + serviceName, ipAddress) +} diff --git a/pkg/cloudprovider/providers/gce/gce_staticip.go b/pkg/cloudprovider/providers/gce/gce_staticip.go deleted file mode 100644 index 36aa119e898..00000000000 --- a/pkg/cloudprovider/providers/gce/gce_staticip.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gce - -import ( - "time" - - compute "google.golang.org/api/compute/v1" -) - -func newStaticIPMetricContext(request string) *metricContext { - return &metricContext{ - start: time.Now(), - attributes: []string{"staticip_" + request, unusedMetricLabel, unusedMetricLabel}, - } -} - -// ReserveGlobalStaticIP creates a global static IP. -// Caller is allocated a random IP if they do not specify an ipAddress. If an -// ipAddress is specified, it must belong to the current project, eg: an -// ephemeral IP associated with a global forwarding rule. -func (gce *GCECloud) ReserveGlobalStaticIP(name, ipAddress string) (address *compute.Address, err error) { - mc := newStaticIPMetricContext("reserve") - op, err := gce.service.GlobalAddresses.Insert(gce.projectID, &compute.Address{Name: name, Address: ipAddress}).Do() - - if err != nil { - return nil, mc.Observe(err) - } - - if err := gce.waitForGlobalOp(op, mc); err != nil { - return nil, err - } - - return gce.service.GlobalAddresses.Get(gce.projectID, name).Do() -} - -// DeleteGlobalStaticIP deletes a global static IP by name. -func (gce *GCECloud) DeleteGlobalStaticIP(name string) error { - mc := newStaticIPMetricContext("delete") - op, err := gce.service.GlobalAddresses.Delete(gce.projectID, name).Do() - if err != nil { - return mc.Observe(err) - } - return gce.waitForGlobalOp(op, mc) -} - -// GetGlobalStaticIP returns the global static IP by name. -func (gce *GCECloud) GetGlobalStaticIP(name string) (*compute.Address, error) { - mc := newStaticIPMetricContext("get") - v, err := gce.service.GlobalAddresses.Get(gce.projectID, name).Do() - return v, mc.Observe(err) -} diff --git a/pkg/cloudprovider/providers/gce/gce_targetpool.go b/pkg/cloudprovider/providers/gce/gce_targetpool.go new file mode 100644 index 00000000000..10304bfe93b --- /dev/null +++ b/pkg/cloudprovider/providers/gce/gce_targetpool.go @@ -0,0 +1,84 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import ( + "time" + + compute "google.golang.org/api/compute/v1" +) + +func newTargetPoolMetricContext(request, region string) *metricContext { + return &metricContext{ + start: time.Now(), + attributes: []string{"targetpool_" + request, region, unusedMetricLabel}, + } +} + +// GetTargetPool returns the TargetPool by name. +func (gce *GCECloud) GetTargetPool(name, region string) (*compute.TargetPool, error) { + mc := newTargetPoolMetricContext("get", region) + v, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() + return v, mc.Observe(err) +} + +// CreateTargetPool creates the passed TargetPool +func (gce *GCECloud) CreateTargetPool(tp *compute.TargetPool, region string) (*compute.TargetPool, error) { + mc := newTargetPoolMetricContext("create", region) + op, err := gce.service.TargetPools.Insert(gce.projectID, region, tp).Do() + if err != nil { + return nil, mc.Observe(err) + } + + if err := gce.waitForRegionOp(op, region, mc); err != nil { + return nil, err + } + + return gce.GetTargetPool(tp.Name, region) +} + +// DeleteTargetPool deletes TargetPool by name. +func (gce *GCECloud) DeleteTargetPool(name, region string) error { + mc := newTargetPoolMetricContext("delete", region) + op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do() + if err != nil { + return mc.Observe(err) + } + return gce.waitForRegionOp(op, region, mc) +} + +// AddInstancesToTargetPool adds instances by link to the TargetPool +func (gce *GCECloud) AddInstancesToTargetPool(name, region string, instanceRefs []*compute.InstanceReference) error { + add := &compute.TargetPoolsAddInstanceRequest{Instances: instanceRefs} + mc := newTargetPoolMetricContext("add_instances", region) + op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do() + if err != nil { + return mc.Observe(err) + } + return gce.waitForRegionOp(op, region, mc) +} + +// RemoveInstancesToTargetPool removes instances by link to the TargetPool +func (gce *GCECloud) RemoveInstancesFromTargetPool(name, region string, instanceRefs []*compute.InstanceReference) error { + remove := &compute.TargetPoolsRemoveInstanceRequest{Instances: instanceRefs} + mc := newTargetPoolMetricContext("remove_instances", region) + op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, remove).Do() + if err != nil { + return mc.Observe(err) + } + return gce.waitForRegionOp(op, region, mc) +} diff --git a/pkg/cloudprovider/providers/gce/gce_util.go b/pkg/cloudprovider/providers/gce/gce_util.go index 90b51759f22..bcbd523c889 100644 --- a/pkg/cloudprovider/providers/gce/gce_util.go +++ b/pkg/cloudprovider/providers/gce/gce_util.go @@ -134,3 +134,18 @@ func equalStringSets(x, y []string) bool { yString := sets.NewString(y...) return xString.Equal(yString) } + +func isNotFound(err error) bool { + return isHTTPErrorCode(err, http.StatusNotFound) +} + +func ignoreNotFound(err error) error { + if err == nil || isNotFound(err) { + return nil + } + return err +} + +func isNotFoundOrInUse(err error) bool { + return isNotFound(err) || isInUsedByError(err) +} diff --git a/pkg/cloudprovider/providers/gce/gce_zones.go b/pkg/cloudprovider/providers/gce/gce_zones.go index c8cec5694fb..881fe652fc1 100644 --- a/pkg/cloudprovider/providers/gce/gce_zones.go +++ b/pkg/cloudprovider/providers/gce/gce_zones.go @@ -16,11 +16,41 @@ limitations under the License. package gce -import "k8s.io/kubernetes/pkg/cloudprovider" +import ( + "fmt" + "time" + compute "google.golang.org/api/compute/v1" + + "k8s.io/kubernetes/pkg/cloudprovider" +) + +func newZonesMetricContext(request, region string) *metricContext { + return &metricContext{ + start: time.Now(), + attributes: []string{"zones_" + request, region, unusedMetricLabel}, + } +} + +// GetZone creates a cloudprovider.Zone of the current zone and region func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) { return cloudprovider.Zone{ FailureDomain: gce.localZone, Region: gce.region, }, nil } + +// ListZonesInRegion returns all zones in a GCP region +func (gce *GCECloud) ListZonesInRegion(region string) ([]*compute.Zone, error) { + mc := newZonesMetricContext("list", region) + filter := fmt.Sprintf("region eq %v", gce.getRegionLink(region)) + list, err := gce.service.Zones.List(gce.projectID).Filter(filter).Do() + if err != nil { + return nil, mc.Observe(err) + } + return list.Items, mc.Observe(err) +} + +func (gce *GCECloud) getRegionLink(region string) string { + return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%v/regions/%v", gce.projectID, region) +} diff --git a/test/e2e/framework/ingress_utils.go b/test/e2e/framework/ingress_utils.go index 4b9912bcb9a..6d6969cd8d0 100644 --- a/test/e2e/framework/ingress_utils.go +++ b/test/e2e/framework/ingress_utils.go @@ -717,9 +717,10 @@ func (cont *GCEIngressController) Init() { // invoking deleteStaticIPs. func (cont *GCEIngressController) CreateStaticIP(name string) string { gceCloud := cont.Cloud.Provider.(*gcecloud.GCECloud) - ip, err := gceCloud.ReserveGlobalStaticIP(name, "") + addr := &compute.Address{Name: name} + ip, err := gceCloud.ReserveGlobalAddress(addr) if err != nil { - if delErr := gceCloud.DeleteGlobalStaticIP(name); delErr != nil { + if delErr := gceCloud.DeleteGlobalAddress(name); delErr != nil { if cont.isHTTPErrorCode(delErr, http.StatusNotFound) { Logf("Static ip with name %v was not allocated, nothing to delete", name) } else { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 0f31c013ed9..37cf069d130 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -5167,20 +5167,15 @@ func CleanupGCEResources(loadBalancerName string) (retErr error) { !IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) { retErr = err } - if err := gceCloud.DeleteForwardingRule(loadBalancerName); err != nil && + if err := gceCloud.DeleteRegionForwardingRule(loadBalancerName, gceCloud.Region()); err != nil && !IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) { retErr = fmt.Errorf("%v\n%v", retErr, err) } - if err := gceCloud.DeleteGlobalStaticIP(loadBalancerName); err != nil && + if err := gceCloud.DeleteRegionAddress(loadBalancerName, gceCloud.Region()); err != nil && !IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) { retErr = fmt.Errorf("%v\n%v", retErr, err) } - // This function shells out to gcloud, so we can't compare for NotFound errors. - // TODO: Invoke cloudprovider method directly instead. - if err := DeleteGCEStaticIP(loadBalancerName); err != nil { - Logf("%v", err) - } var hcNames []string hc, getErr := gceCloud.GetHttpHealthCheck(loadBalancerName) if getErr != nil && !IsGoogleAPIHTTPErrorCode(getErr, http.StatusNotFound) { @@ -5190,7 +5185,7 @@ func CleanupGCEResources(loadBalancerName string) (retErr error) { if hc != nil { hcNames = append(hcNames, hc.Name) } - if err := gceCloud.DeleteTargetPool(loadBalancerName, hcNames...); err != nil && + if err := gceCloud.DeleteExternalTargetPoolAndChecks(loadBalancerName, gceCloud.Region(), hcNames...); err != nil && !IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) { retErr = fmt.Errorf("%v\n%v", retErr, err) }