Merge pull request #46663 from nicksardo/gce-internallb

Automatic merge from submit-queue (batch tested with PRs 46550, 46663, 46816, 46820, 46460)

[GCE] Support internal load balancers

**What this PR does / why we need it**:
Allows users to expose K8s services externally of the K8s cluster but within their GCP network. 

Fixes #33483

**Important User Notes:**
- This is a beta feature. ILB could be enabled differently in the future. 
- Requires nodes having version 1.7.0+ (ILB requires health checking and a health check endpoint on kube-proxy has just been exposed)
- This cannot be used for intra-cluster communication. Do not call the load balancer IP from a K8s node/pod.  
- There is no reservation system for private IPs. You can specify a RFC 1918 address in `loadBalancerIP` field, but it could be lost to another VM or LB if service settings are modified.
- If you're running an ingress, your existing loadbalancer backend service must be using BalancingMode type `RATE` - not `UTILIZATION`. 
  - Option 1: With a 1.5.8+ or 1.6.4+ version master, delete all your ingresses, and re-create them.
  - Option 2: Migrate to a new cluster running 1.7.0. Considering ILB requires nodes with 1.7.0, this isn't a bad idea.
  - Option 3: Possible migration opportunity, but use at your own risk. More to come later.


**Reviewer Notes**:
Several files were renamed, so github thinks ~2k lines have changed. Review commits one-by-one to see the actual changes.

**Release note**:
```release-note
Support creation of GCP Internal Load Balancers from Service objects
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-05 16:43:41 -07:00 committed by GitHub
commit 4faf7f1f4c
19 changed files with 2162 additions and 1206 deletions

View File

@ -13,6 +13,8 @@ go_library(
srcs = [
"doc.go",
"gce.go",
"gce_addresses.go",
"gce_annotations.go",
"gce_backendservice.go",
"gce_cert.go",
"gce_clusterid.go",
@ -24,9 +26,12 @@ go_library(
"gce_instancegroup.go",
"gce_instances.go",
"gce_loadbalancer.go",
"gce_loadbalancer_external.go",
"gce_loadbalancer_internal.go",
"gce_loadbalancer_naming.go",
"gce_op.go",
"gce_routes.go",
"gce_staticip.go",
"gce_targetpool.go",
"gce_targetproxy.go",
"gce_urlmap.go",
"gce_util.go",

View File

@ -78,6 +78,8 @@ const (
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
// ClusterID contains functionality for getting (and initializing) the ingress-uid. Call GCECloud.Initialize()
// for the cloudprovider to start watching the configmap.
ClusterID ClusterID
service *compute.Service

View File

@ -0,0 +1,96 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"time"
compute "google.golang.org/api/compute/v1"
)
func newAddressMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"address_" + request, region, unusedMetricLabel},
}
}
// ReserveGlobalAddress creates a global address.
// Caller is allocated a random IP if they do not specify an ipAddress. If an
// ipAddress is specified, it must belong to the current project, eg: an
// ephemeral IP associated with a global forwarding rule.
func (gce *GCECloud) ReserveGlobalAddress(addr *compute.Address) (*compute.Address, error) {
mc := newAddressMetricContext("reserve", "")
op, err := gce.service.GlobalAddresses.Insert(gce.projectID, addr).Do()
if err != nil {
return nil, mc.Observe(err)
}
if err := gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetGlobalAddress(addr.Name)
}
// DeleteGlobalAddress deletes a global address by name.
func (gce *GCECloud) DeleteGlobalAddress(name string) error {
mc := newAddressMetricContext("delete", "")
op, err := gce.service.GlobalAddresses.Delete(gce.projectID, name).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForGlobalOp(op, mc)
}
// GetGlobalAddress returns the global address by name.
func (gce *GCECloud) GetGlobalAddress(name string) (*compute.Address, error) {
mc := newAddressMetricContext("get", "")
v, err := gce.service.GlobalAddresses.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// ReserveRegionAddress creates a region address
func (gce *GCECloud) ReserveRegionAddress(addr *compute.Address, region string) (*compute.Address, error) {
mc := newAddressMetricContext("reserve", region)
op, err := gce.service.Addresses.Insert(gce.projectID, region, addr).Do()
if err != nil {
return nil, mc.Observe(err)
}
if err := gce.waitForRegionOp(op, region, mc); err != nil {
return nil, err
}
return gce.GetRegionAddress(addr.Name, region)
}
// DeleteRegionAddress deletes a region address by name.
func (gce *GCECloud) DeleteRegionAddress(name, region string) error {
mc := newAddressMetricContext("delete", region)
op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, region, mc)
}
// GetRegionAddress returns the region address by name
func (gce *GCECloud) GetRegionAddress(name, region string) (*compute.Address, error) {
mc := newAddressMetricContext("get", region)
v, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
return v, mc.Observe(err)
}

View File

@ -0,0 +1,51 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import "k8s.io/kubernetes/pkg/api/v1"
type LoadBalancerType string
const (
// ServiceAnnotationLoadBalancerType is the annotation used on a service with type LoadBalancer
// dictates what specific kind of GCP LB should be assembled.
// Currently, only "internal" is supported.
ServiceAnnotationLoadBalancerType = "cloud.google.com/load-balancer-type"
LBTypeInternal LoadBalancerType = "internal"
)
// GetLoadBalancerAnnotationType returns the type of GCP load balancer which should be assembled.
func GetLoadBalancerAnnotationType(service *v1.Service) (LoadBalancerType, bool) {
v := LoadBalancerType("")
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return v, false
}
l, ok := service.Annotations[ServiceAnnotationLoadBalancerType]
v = LoadBalancerType(l)
if !ok {
return v, false
}
switch v {
case LBTypeInternal:
return v, true
default:
return v, false
}
}

View File

@ -23,23 +23,23 @@ import (
compute "google.golang.org/api/compute/v1"
)
func newBackendServiceMetricContext(request string) *metricContext {
func newBackendServiceMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"backendservice_" + request, unusedMetricLabel, unusedMetricLabel},
attributes: []string{"backendservice_" + request, region, unusedMetricLabel},
}
}
// GetGlobalBackendService retrieves a backend by name.
func (gce *GCECloud) GetGlobalBackendService(name string) (*compute.BackendService, error) {
mc := newBackendServiceMetricContext("get")
mc := newBackendServiceMetricContext("get", "")
v, err := gce.service.BackendServices.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// UpdateGlobalBackendService applies the given BackendService as an update to an existing service.
func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("update")
mc := newBackendServiceMetricContext("update", "")
op, err := gce.service.BackendServices.Update(gce.projectID, bg.Name, bg).Do()
if err != nil {
return mc.Observe(err)
@ -50,7 +50,7 @@ func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) erro
// DeleteGlobalBackendService deletes the given BackendService by name.
func (gce *GCECloud) DeleteGlobalBackendService(name string) error {
mc := newBackendServiceMetricContext("delete")
mc := newBackendServiceMetricContext("delete", "")
op, err := gce.service.BackendServices.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
@ -64,7 +64,7 @@ func (gce *GCECloud) DeleteGlobalBackendService(name string) error {
// CreateGlobalBackendService creates the given BackendService.
func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("create")
mc := newBackendServiceMetricContext("create", "")
op, err := gce.service.BackendServices.Insert(gce.projectID, bg).Do()
if err != nil {
return mc.Observe(err)
@ -75,7 +75,7 @@ func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) erro
// ListGlobalBackendServices lists all backend services in the project.
func (gce *GCECloud) ListGlobalBackendServices() (*compute.BackendServiceList, error) {
mc := newBackendServiceMetricContext("list")
mc := newBackendServiceMetricContext("list", "")
// TODO: use PageToken to list all not just the first 500
v, err := gce.service.BackendServices.List(gce.projectID).Do()
return v, mc.Observe(err)
@ -85,7 +85,7 @@ func (gce *GCECloud) ListGlobalBackendServices() (*compute.BackendServiceList, e
// name, in the given instanceGroup. The instanceGroupLink is the fully
// qualified self link of an instance group.
func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
mc := newBackendServiceMetricContext("get_health")
mc := newBackendServiceMetricContext("get_health", "")
groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink}
v, err := gce.service.BackendServices.GetHealth(gce.projectID, name, groupRef).Do()
return v, mc.Observe(err)
@ -93,25 +93,25 @@ func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLin
// GetRegionBackendService retrieves a backend by name.
func (gce *GCECloud) GetRegionBackendService(name, region string) (*compute.BackendService, error) {
mc := newBackendServiceMetricContext("get")
mc := newBackendServiceMetricContext("get", region)
v, err := gce.service.RegionBackendServices.Get(gce.projectID, region, name).Do()
return v, mc.Observe(err)
}
// UpdateRegionBackendService applies the given BackendService as an update to an existing service.
func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("update")
op, err := gce.service.RegionBackendServices.Update(gce.projectID, bg.Region, bg.Name, bg).Do()
func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService, region string) error {
mc := newBackendServiceMetricContext("update", region)
op, err := gce.service.RegionBackendServices.Update(gce.projectID, region, bg.Name, bg).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, bg.Region, mc)
return gce.waitForRegionOp(op, region, mc)
}
// DeleteRegionBackendService deletes the given BackendService by name.
func (gce *GCECloud) DeleteRegionBackendService(name, region string) error {
mc := newBackendServiceMetricContext("delete")
mc := newBackendServiceMetricContext("delete", region)
op, err := gce.service.RegionBackendServices.Delete(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
@ -124,19 +124,19 @@ func (gce *GCECloud) DeleteRegionBackendService(name, region string) error {
}
// CreateRegionBackendService creates the given BackendService.
func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("create")
op, err := gce.service.RegionBackendServices.Insert(gce.projectID, bg.Region, bg).Do()
func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService, region string) error {
mc := newBackendServiceMetricContext("create", region)
op, err := gce.service.RegionBackendServices.Insert(gce.projectID, region, bg).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, bg.Region, mc)
return gce.waitForRegionOp(op, region, mc)
}
// ListRegionBackendServices lists all backend services in the project.
func (gce *GCECloud) ListRegionBackendServices(region string) (*compute.BackendServiceList, error) {
mc := newBackendServiceMetricContext("list")
mc := newBackendServiceMetricContext("list", region)
// TODO: use PageToken to list all not just the first 500
v, err := gce.service.RegionBackendServices.List(gce.projectID, region).Do()
return v, mc.Observe(err)
@ -146,7 +146,7 @@ func (gce *GCECloud) ListRegionBackendServices(region string) (*compute.BackendS
// name, in the given instanceGroup. The instanceGroupLink is the fully
// qualified self link of an instance group.
func (gce *GCECloud) GetRegionalBackendServiceHealth(name, region string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
mc := newBackendServiceMetricContext("get_health")
mc := newBackendServiceMetricContext("get_health", region)
groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink}
v, err := gce.service.RegionBackendServices.GetHealth(gce.projectID, region, name, groupRef).Do()
return v, mc.Observe(err)

View File

@ -17,7 +17,6 @@ limitations under the License.
package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
@ -44,8 +43,7 @@ func (gce *GCECloud) CreateGlobalForwardingRule(targetProxyLink, ip, name, portR
}
op, err := gce.service.GlobalForwardingRules.Insert(gce.projectID, rule).Do()
if err != nil {
mc.Observe(err)
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
@ -56,13 +54,12 @@ func (gce *GCECloud) CreateGlobalForwardingRule(targetProxyLink, ip, name, portR
// SetProxyForGlobalForwardingRule links the given TargetHttp(s)Proxy with the given GlobalForwardingRule.
// targetProxyLink is the SelfLink of a TargetHttp(s)Proxy.
func (gce *GCECloud) SetProxyForGlobalForwardingRule(fw *compute.ForwardingRule, targetProxyLink string) error {
func (gce *GCECloud) SetProxyForGlobalForwardingRule(forwardingRuleName, targetProxyLink string) error {
mc := newForwardingRuleMetricContext("set_proxy", "")
op, err := gce.service.GlobalForwardingRules.SetTarget(
gce.projectID, fw.Name, &compute.TargetReference{Target: targetProxyLink}).Do()
gce.projectID, forwardingRuleName, &compute.TargetReference{Target: targetProxyLink}).Do()
if err != nil {
mc.Observe(err)
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op, mc)
@ -73,13 +70,7 @@ func (gce *GCECloud) DeleteGlobalForwardingRule(name string) error {
mc := newForwardingRuleMetricContext("delete", "")
op, err := gce.service.GlobalForwardingRules.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
mc.Observe(nil)
return nil
}
mc.Observe(err)
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op, mc)
@ -99,3 +90,41 @@ func (gce *GCECloud) ListGlobalForwardingRules() (*compute.ForwardingRuleList, e
v, err := gce.service.GlobalForwardingRules.List(gce.projectID).Do()
return v, mc.Observe(err)
}
// GetRegionForwardingRule returns the RegionalForwardingRule by name & region.
func (gce *GCECloud) GetRegionForwardingRule(name, region string) (*compute.ForwardingRule, error) {
mc := newForwardingRuleMetricContext("get", region)
v, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
return v, mc.Observe(err)
}
// ListRegionForwardingRules lists all RegionalForwardingRules in the project & region.
func (gce *GCECloud) ListRegionForwardingRules(region string) (*compute.ForwardingRuleList, error) {
mc := newForwardingRuleMetricContext("list", region)
// TODO: use PageToken to list all not just the first 500
v, err := gce.service.ForwardingRules.List(gce.projectID, region).Do()
return v, mc.Observe(err)
}
// CreateRegionForwardingRule creates and returns a
// RegionalForwardingRule that points to the given BackendService
func (gce *GCECloud) CreateRegionForwardingRule(rule *compute.ForwardingRule, region string) error {
mc := newForwardingRuleMetricContext("create", region)
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, rule).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, region, mc)
}
// DeleteRegionForwardingRule deletes the RegionalForwardingRule by name & region.
func (gce *GCECloud) DeleteRegionForwardingRule(name, region string) error {
mc := newForwardingRuleMetricContext("delete", region)
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, region, mc)
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package gce
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api/v1"
@ -208,28 +207,12 @@ func GetNodesHealthCheckPort() int32 {
return lbNodesHealthCheckPort
}
// getNodesHealthCheckPath returns the health check path used by the GCE load
// GetNodesHealthCheckPath returns the health check path used by the GCE load
// balancers (l4) for performing health checks on nodes.
func getNodesHealthCheckPath() string {
func GetNodesHealthCheckPath() string {
return nodesHealthCheckPath
}
// makeNodesHealthCheckName returns name of the health check resource used by
// the GCE load balancers (l4) for performing health checks on nodes.
func makeNodesHealthCheckName(clusterID string) string {
return fmt.Sprintf("k8s-%v-node", clusterID)
}
// MakeHealthCheckFirewallName returns the firewall name used by the GCE load
// balancers (l4) for performing health checks.
func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string {
if isNodesHealthCheck {
// TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc.
return makeNodesHealthCheckName(clusterID) + "-http-hc"
}
return "k8s-" + hcName + "-http-hc"
}
// isAtLeastMinNodesHealthCheckVersion checks if a version is higher than
// `minNodesHealthCheckVersion`.
func isAtLeastMinNodesHealthCheckVersion(vstring string) bool {

View File

@ -81,20 +81,16 @@ func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, stat
// AddInstancesToInstanceGroup adds the given instances to the given
// instance group.
func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceNames []string) error {
func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceRefs []*compute.InstanceReference) error {
mc := newInstanceGroupMetricContext("add_instances", zone)
if len(instanceNames) == 0 {
if len(instanceRefs) == 0 {
return nil
}
// Adding the same instance twice will result in a 4xx error
instances := []*compute.InstanceReference{}
for _, ins := range instanceNames {
instances = append(instances, &compute.InstanceReference{Instance: makeHostURL(gce.projectID, zone, ins)})
}
op, err := gce.service.InstanceGroups.AddInstances(
gce.projectID, zone, name,
&compute.InstanceGroupsAddInstancesRequest{
Instances: instances,
Instances: instanceRefs,
}).Do()
if err != nil {
return mc.Observe(err)
@ -105,21 +101,16 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta
// RemoveInstancesFromInstanceGroup removes the given instances from
// the instance group.
func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceNames []string) error {
func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceRefs []*compute.InstanceReference) error {
mc := newInstanceGroupMetricContext("remove_instances", zone)
if len(instanceNames) == 0 {
if len(instanceRefs) == 0 {
return nil
}
instances := []*compute.InstanceReference{}
for _, ins := range instanceNames {
instanceLink := makeHostURL(gce.projectID, zone, ins)
instances = append(instances, &compute.InstanceReference{Instance: instanceLink})
}
op, err := gce.service.InstanceGroups.RemoveInstances(
gce.projectID, zone, name,
&compute.InstanceGroupsRemoveInstancesRequest{
Instances: instances,
Instances: instanceRefs,
}).Do()
if err != nil {
return mc.Observe(err)

View File

@ -33,6 +33,11 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
)
const (
defaultZone = ""
)
func newInstancesMetricContext(request, zone string) *metricContext {
@ -42,6 +47,34 @@ func newInstancesMetricContext(request, zone string) *metricContext {
}
}
func splitNodesByZone(nodes []*v1.Node) map[string][]*v1.Node {
zones := make(map[string][]*v1.Node)
for _, n := range nodes {
z := getZone(n)
if z != defaultZone {
zones[z] = append(zones[z], n)
}
}
return zones
}
func getZone(n *v1.Node) string {
zone, ok := n.Labels[kubeletapis.LabelZoneFailureDomain]
if !ok {
return defaultZone
}
return zone
}
// ToInstanceReferences returns instance references by links
func (gce *GCECloud) ToInstanceReferences(zone string, instanceNames []string) (refs []*compute.InstanceReference) {
for _, ins := range instanceNames {
instanceLink := makeHostURL(gce.projectID, zone, ins)
refs = append(refs, &compute.InstanceReference{Instance: instanceLink})
}
return refs
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (gce *GCECloud) NodeAddresses(_ types.NodeName) ([]v1.NodeAddress, error) {
internalIP, err := metadata.Get("instance/network-interfaces/0/ip")

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,952 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"fmt"
"net/http"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1"
apiservice "k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/cloudprovider"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
)
// ensureExternalLoadBalancer is the external implementation of LoadBalancer.EnsureLoadBalancer.
// Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them.
//
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) ensureExternalLoadBalancer(clusterName string, apiService *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
if len(nodes) == 0 {
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
}
hostNames := nodeNames(nodes)
supportsNodesHealthCheck := supportsNodesHealthCheck(nodes)
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return nil, err
}
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
loadBalancerIP := apiService.Spec.LoadBalancerIP
ports := apiService.Spec.Ports
portStr := []string{}
for _, p := range apiService.Spec.Ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}
affinityType := apiService.Spec.SessionAffinity
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations)
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports)
if err != nil {
return nil, err
}
if !fwdRuleExists {
glog.V(2).Infof("Forwarding rule %v for Service %v/%v doesn't exist",
loadBalancerName, apiService.Namespace, apiService.Name)
}
// Make sure we know which IP address will be used and have properly reserved
// it as static before moving forward with the rest of our operations.
//
// We use static IP addresses when updating a load balancer to ensure that we
// can replace the load balancer's other components without changing the
// address its service is reachable on. We do it this way rather than always
// keeping the static IP around even though this is more complicated because
// it makes it less likely that we'll run into quota issues. Only 7 static
// IP addresses are allowed per region by default.
//
// We could let an IP be allocated for us when the forwarding rule is created,
// but we need the IP to set up the firewall rule, and we want to keep the
// forwarding rule creation as the last thing that needs to be done in this
// function in order to maintain the invariant that "if the forwarding rule
// exists, the LB has been fully created".
ipAddress := ""
// Through this process we try to keep track of whether it is safe to
// release the IP that was allocated. If the user specifically asked for
// an IP, we assume they are managing it themselves. Otherwise, we will
// release the IP in case of early-terminating failure or upon successful
// creating of the LB.
// TODO(#36535): boil this logic down into a set of component functions
// and key the flag values off of errors returned.
isUserOwnedIP := false // if this is set, we never release the IP
isSafeToReleaseIP := false
defer func() {
if isUserOwnedIP {
return
}
if isSafeToReleaseIP {
if err := gce.DeleteRegionAddress(loadBalancerName, gce.region); err != nil && !isNotFound(err) {
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
} else if isNotFound(err) {
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): address %s is not reserved.", loadBalancerName, serviceName, ipAddress)
} else {
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress)
}
} else {
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
}
}()
if loadBalancerIP != "" {
// If a specific IP address has been requested, we have to respect the
// user's request and use that IP. If the forwarding rule was already using
// a different IP, it will be harmlessly abandoned because it was only an
// ephemeral IP (or it was a different static IP owned by the user, in which
// case we shouldn't delete it anyway).
if isStatic, err := gce.projectOwnsStaticIP(loadBalancerName, gce.region, loadBalancerIP); err != nil {
return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", loadBalancerIP, err)
} else if isStatic {
// The requested IP is a static IP, owned and managed by the user.
isUserOwnedIP = true
isSafeToReleaseIP = false
ipAddress = loadBalancerIP
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", loadBalancerName, serviceName, ipAddress)
} else if loadBalancerIP == fwdRuleIP {
// The requested IP is not a static IP, but is currently assigned
// to this forwarding rule, so we can keep it.
isUserOwnedIP = false
isSafeToReleaseIP = true
ipAddress, _, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
// The requested IP is not static and it is not assigned to the
// current forwarding rule. It might be attached to a different
// rule or it might not be part of this project at all. Either
// way, we can't use it.
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", loadBalancerIP, loadBalancerName, serviceName, err)
}
} else {
// The user did not request a specific IP.
isUserOwnedIP = false
// This will either allocate a new static IP if the forwarding rule didn't
// already have an IP, or it will promote the forwarding rule's current
// IP from ephemeral to static, or it will just get the IP if it is
// already static.
existed := false
ipAddress, existed, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
}
if existed {
// If the IP was not specifically requested by the user, but it
// already existed, it seems to be a failed update cycle. We can
// use this IP and try to run through the process again, but we
// should not release the IP unless it is explicitly flagged as OK.
isSafeToReleaseIP = false
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
// For total clarity. The IP did not pre-exist and the user did
// not ask for a particular one, so we can release the IP in case
// of failure or success.
isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", loadBalancerName, serviceName, ipAddress)
}
}
// Deal with the firewall next. The reason we do this here rather than last
// is because the forwarding rule is used as the indicator that the load
// balancer is fully created - it's what getLoadBalancer checks for.
// Check if user specified the allow source range
sourceRanges, err := apiservice.GetLoadBalancerSourceRanges(apiService)
if err != nil {
return nil, err
}
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports, sourceRanges)
if err != nil {
return nil, err
}
if firewallNeedsUpdate {
desc := makeFirewallDescription(serviceName.String(), ipAddress)
// Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated.
if firewallExists {
glog.Infof("EnsureLoadBalancer(%v(%v)): updating firewall", loadBalancerName, serviceName)
if err := gce.updateFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName)
} else {
glog.Infof("EnsureLoadBalancer(%v(%v)): creating firewall", loadBalancerName, serviceName)
if err := gce.createFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName)
}
}
tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(loadBalancerName, gce.region, affinityType)
if err != nil {
return nil, err
}
if !tpExists {
glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
}
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return nil, fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err)
}
// Check which health check needs to create and which health check needs to delete.
// Health check management is coupled with target pool operation to prevent leaking.
var hcToCreate, hcToDelete *compute.HttpHealthCheck
hcLocalTrafficExisting, err := gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
return nil, fmt.Errorf("error checking HTTP health check %s: %v", loadBalancerName, err)
}
if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
glog.V(4).Infof("service %v (%v) needs local traffic health checks on: %d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path)
if hcLocalTrafficExisting == nil {
// This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service
// turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the
// target pool to use local traffic health check.
glog.V(2).Infof("Updating from nodes health checks to local traffic health checks for service %v LB %v", apiService.Name, loadBalancerName)
if supportsNodesHealthCheck {
hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
tpNeedsUpdate = true
}
hcToCreate = makeHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
} else {
glog.V(4).Infof("Service %v needs nodes health checks.", apiService.Name)
if hcLocalTrafficExisting != nil {
// This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service
// and turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the
// target pool to use nodes health check.
glog.V(2).Infof("Updating from local traffic health checks to nodes health checks for service %v LB %v", apiService.Name, loadBalancerName)
hcToDelete = hcLocalTrafficExisting
tpNeedsUpdate = true
}
if supportsNodesHealthCheck {
hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
}
// Now we get to some slightly more interesting logic.
// First, neither target pools nor forwarding rules can be updated in place -
// they have to be deleted and recreated.
// Second, forwarding rules are layered on top of target pools in that you
// can't delete a target pool that's currently in use by a forwarding rule.
// Thus, we have to tear down the forwarding rule if either it or the target
// pool needs to be updated.
if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsUpdate) {
// Begin critical section. If we have to delete the forwarding rule,
// and something should fail before we recreate it, don't release the
// IP. That way we can come back to it later.
isSafeToReleaseIP = false
if err := gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
}
if tpExists && tpNeedsUpdate {
// Pass healthchecks to DeleteExternalTargetPoolAndChecks to cleanup health checks after cleaning up the target pool itself.
var hcNames []string
if hcToDelete != nil {
hcNames = append(hcNames, hcToDelete.Name)
}
if err := gce.DeleteExternalTargetPoolAndChecks(loadBalancerName, gce.region, hcNames...); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
}
// Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new).
if tpNeedsUpdate {
createInstances := hosts
if len(hosts) > maxTargetPoolCreateInstances {
createInstances = createInstances[:maxTargetPoolCreateInstances]
}
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddress, gce.region, createInstances, affinityType, hcToCreate); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
if hcToCreate != nil {
glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks %v for target pool", loadBalancerName, serviceName, hcToCreate.Name)
}
if len(hosts) <= maxTargetPoolCreateInstances {
glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
} else {
glog.Infof("EnsureLoadBalancer(%v(%v)): created initial target pool (now updating with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances)
created := sets.NewString()
for _, host := range createInstances {
created.Insert(host.makeComparableHostPath())
}
if err := gce.updateTargetPool(loadBalancerName, created, hosts); err != nil {
return nil, fmt.Errorf("failed to update target pool %s: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): updated target pool (with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances)
}
}
if tpNeedsUpdate || fwdRuleNeedsUpdate {
glog.Infof("EnsureLoadBalancer(%v(%v)): creating forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress)
if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err)
}
// End critical section. It is safe to release the static IP (which
// just demotes it to ephemeral) now that it is attached. In the case
// of a user-requested IP, the "is user-owned" flag will be set,
// preventing it from actually being released.
isSafeToReleaseIP = true
glog.Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress)
}
status := &v1.LoadBalancerStatus{}
status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddress}}
return status, nil
}
// updateExternalLoadBalancer is the external implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) updateExternalLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
hosts, err := gce.getInstancesByNames(nodeNames(nodes))
if err != nil {
return err
}
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}
existing := sets.NewString()
for _, instance := range pool.Instances {
existing.Insert(hostURLToComparablePath(instance))
}
return gce.updateTargetPool(loadBalancerName, existing, hosts)
}
// ensureExternalLoadBalancerDeleted is the external implementation of LoadBalancer.EnsureLoadBalancerDeleted
func (gce *GCECloud) ensureExternalLoadBalancerDeleted(clusterName string, service *v1.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
var hcNames []string
if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" {
hcToDelete, err := gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
return err
}
hcNames = append(hcNames, hcToDelete.Name)
} else {
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err)
}
// EnsureLoadBalancerDeleted() could be triggered by changing service from
// LoadBalancer type to others. In this case we have no idea whether it was
// using local traffic health check or nodes health check. Attempt to delete
// both to prevent leaking.
hcNames = append(hcNames, loadBalancerName)
hcNames = append(hcNames, makeNodesHealthCheckName(clusterID))
}
errs := utilerrors.AggregateGoroutines(
func() error { return ignoreNotFound(gce.DeleteFirewall(makeFirewallName(loadBalancerName))) },
// Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case.
func() error { return ignoreNotFound(gce.DeleteRegionAddress(loadBalancerName, gce.region)) },
func() error {
// The forwarding rule must be deleted before either the target pool can,
// unfortunately, so we have to do these two serially.
if err := ignoreNotFound(gce.DeleteRegionForwardingRule(loadBalancerName, gce.region)); err != nil {
return err
}
if err := gce.DeleteExternalTargetPoolAndChecks(loadBalancerName, gce.region, hcNames...); err != nil {
return err
}
return nil
},
)
if errs != nil {
return utilerrors.Flatten(errs)
}
return nil
}
func (gce *GCECloud) DeleteExternalTargetPoolAndChecks(name, region string, hcNames ...string) error {
if err := gce.DeleteTargetPool(name, region); err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error())
return err
}
// Deletion of health checks is allowed only after the TargetPool reference is deleted
for _, hcName := range hcNames {
if err := func() error {
// Check whether it is nodes health check, which has different name from the load-balancer.
isNodesHealthCheck := hcName != name
if isNodesHealthCheck {
// Lock to prevent deleting necessary nodes health check before it gets attached
// to target pool.
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
}
glog.Infof("Deleting health check %v", hcName)
if err := gce.DeleteHttpHealthCheck(hcName); err != nil {
// Delete nodes health checks will fail if any other target pool is using it.
if isInUsedByError(err) {
glog.V(4).Infof("Health check %v is in used: %v.", hcName, err)
return nil
} else if !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Warningf("Failed to delete health check %v: %v", hcName, err)
return err
}
// StatusNotFound could happen when:
// - This is the first attempt but we pass in a healthcheck that is already deleted
// to prevent leaking.
// - This is the first attempt but user manually deleted the heathcheck.
// - This is a retry and in previous round we failed to delete the healthcheck firewall
// after deleted the healthcheck.
// We continue to delete the healthcheck firewall to prevent leaking.
glog.V(4).Infof("Health check %v is already deleted.", hcName)
}
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return fmt.Errorf("error getting cluster ID: %v", err)
}
// If health check is deleted without error, it means no load-balancer is using it.
// So we should delete the health check firewall as well.
fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck)
glog.Infof("Deleting firewall %v.", fwName)
if err := gce.DeleteFirewall(fwName); err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
glog.V(4).Infof("Firewall %v is already deleted.", fwName)
return nil
}
return err
}
return nil
}(); err != nil {
return err
}
}
return nil
}
func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error {
// health check management is coupled with targetPools to prevent leaks. A
// target pool is the only thing that requires a health check, so we delete
// associated checks on teardown, and ensure checks on setup.
hcLinks := []string{}
if hc != nil {
// Check whether it is nodes health check, which has different name from the load-balancer.
isNodesHealthCheck := hc.Name != name
if isNodesHealthCheck {
// Lock to prevent necessary nodes health check / firewall gets deleted.
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
}
if !gce.OnXPN() {
if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, region, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil {
return err
}
}
var err error
if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil {
return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hc.Port, hc.RequestPath, err)
}
hcLinks = append(hcLinks, hc.SelfLink)
}
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
glog.Infof("Creating targetpool %v with %d healthchecks", name, len(hcLinks))
pool := &compute.TargetPool{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
Instances: instances,
SessionAffinity: translateAffinityType(affinityType),
HealthChecks: hcLinks,
}
if _, err := gce.CreateTargetPool(pool, region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
return nil
}
func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.String, hosts []*gceInstance) error {
var toAdd []*compute.InstanceReference
var toRemove []*compute.InstanceReference
for _, host := range hosts {
link := host.makeComparableHostPath()
if !existing.Has(link) {
toAdd = append(toAdd, &compute.InstanceReference{Instance: link})
}
existing.Delete(link)
}
for link := range existing {
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
}
if len(toAdd) > 0 {
if err := gce.AddInstancesToTargetPool(loadBalancerName, gce.region, toAdd); err != nil {
return err
}
}
if len(toRemove) > 0 {
if err := gce.RemoveInstancesFromTargetPool(loadBalancerName, gce.region, toRemove); err != nil {
return err
}
}
// Try to verify that the correct number of nodes are now in the target pool.
// We've been bitten by a bug here before (#11327) where all nodes were
// accidentally removed and want to make similar problems easier to notice.
updatedPool, err := gce.GetTargetPool(loadBalancerName, gce.region)
if err != nil {
return err
}
if len(updatedPool.Instances) != len(hosts) {
glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s",
len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts))
}
return nil
}
func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
}
func makeHttpHealthCheck(name, path string, port int32) *compute.HttpHealthCheck {
return &compute.HttpHealthCheck{
Name: name,
Port: int64(port),
RequestPath: path,
Host: "",
Description: makeHealthCheckDescription(name),
CheckIntervalSec: gceHcCheckIntervalSeconds,
TimeoutSec: gceHcTimeoutSeconds,
HealthyThreshold: gceHcHealthyThreshold,
UnhealthyThreshold: gceHcUnhealthyThreshold,
}
}
func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) {
newHC := makeHttpHealthCheck(name, path, port)
hc, err = gce.GetHttpHealthCheck(name)
if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path)
if err = gce.CreateHttpHealthCheck(newHC); err != nil {
return nil, err
}
hc, err = gce.GetHttpHealthCheck(name)
if err != nil {
glog.Errorf("Failed to get http health check %v", err)
return nil, err
}
glog.Infof("Created HTTP health check %v healthCheckNodePort: %d", name, port)
return hc, nil
}
// Validate health check fields
glog.V(4).Infof("Checking http health check params %s", name)
drift := hc.Port != int64(port) || hc.RequestPath != path || hc.Description != makeHealthCheckDescription(name)
drift = drift || hc.CheckIntervalSec != gceHcCheckIntervalSeconds || hc.TimeoutSec != gceHcTimeoutSeconds
drift = drift || hc.UnhealthyThreshold != gceHcUnhealthyThreshold || hc.HealthyThreshold != gceHcHealthyThreshold
if drift {
glog.Warningf("Health check %v exists but parameters have drifted - updating...", name)
if err := gce.UpdateHttpHealthCheck(newHC); err != nil {
glog.Warningf("Failed to reconcile http health check %v parameters", name)
return nil, err
}
glog.V(4).Infof("Corrected health check %v parameters successful", name)
}
return hc, nil
}
// Passing nil for requested IP is perfectly fine - it just means that no specific
// IP is being requested.
// Returns whether the forwarding rule exists, whether it needs to be updated,
// what its IP address is (if it exists), and any error we encountered.
func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []v1.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, "", nil
}
// Err on the side of caution in case of errors. Caller should notice the error and retry.
// We never want to end up recreating resources because gce api flaked.
return true, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err)
}
// If the user asks for a specific static ip through the Service spec,
// check that we're actually using it.
// TODO: we report loadbalancer IP through status, so we want to verify if
// that matches the forwarding rule as well.
if loadBalancerIP != "" && loadBalancerIP != fwd.IPAddress {
glog.Infof("LoadBalancer ip for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPAddress, loadBalancerIP)
return true, true, fwd.IPAddress, nil
}
portRange, err := loadBalancerPortRange(ports)
if err != nil {
// Err on the side of caution in case of errors. Caller should notice the error and retry.
// We never want to end up recreating resources because gce api flaked.
return true, false, "", err
}
if portRange != fwd.PortRange {
glog.Infof("LoadBalancer port range for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.PortRange, portRange)
return true, true, fwd.IPAddress, nil
}
// The service controller verified all the protocols match on the ports, just check the first one
if string(ports[0].Protocol) != fwd.IPProtocol {
glog.Infof("LoadBalancer protocol for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPProtocol, string(ports[0].Protocol))
return true, true, fwd.IPAddress, nil
}
return true, false, fwd.IPAddress, nil
}
// Doesn't check whether the hosts have changed, since host updating is handled
// separately.
func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType v1.ServiceAffinity) (exists bool, needsUpdate bool, err error) {
tp, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
// Err on the side of caution in case of errors. Caller should notice the error and retry.
// We never want to end up recreating resources because gce api flaked.
return true, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
// TODO: If the user modifies their Service's session affinity, it *should*
// reflect in the associated target pool. However, currently not setting the
// session affinity on a target pool defaults it to the empty string while
// not setting in on a Service defaults it to None. There is a lack of
// documentation around the default setting for the target pool, so if we
// find it's the undocumented empty string, don't blindly recreate the
// target pool (which results in downtime). Fix this when we have formally
// defined the defaults on either side.
if tp.SessionAffinity != "" && translateAffinityType(affinityType) != tp.SessionAffinity {
glog.Infof("LoadBalancer target pool %v changed affinity from %v to %v", name, tp.SessionAffinity, affinityType)
return true, true, nil
}
return true, false, nil
}
func (h *gceInstance) makeComparableHostPath() string {
return fmt.Sprintf("/zones/%s/instances/%s", h.Zone, h.Name)
}
func nodeNames(nodes []*v1.Node) []string {
ret := make([]string, len(nodes))
for i, node := range nodes {
ret[i] = node.Name
}
return ret
}
func makeHostURL(projectID, zone, host string) string {
host = canonicalizeInstanceName(host)
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/instances/%s",
projectID, zone, host)
}
func hostURLToComparablePath(hostURL string) string {
idx := strings.Index(hostURL, "/zones/")
if idx < 0 {
return ""
}
return hostURL[idx:]
}
func loadBalancerPortRange(ports []v1.ServicePort) (string, error) {
if len(ports) == 0 {
return "", fmt.Errorf("no ports specified for GCE load balancer")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
if ports[0].Protocol != v1.ProtocolTCP && ports[0].Protocol != v1.ProtocolUDP {
return "", fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(ports[0].Protocol))
}
minPort := int32(65536)
maxPort := int32(0)
for i := range ports {
if ports[i].Port < minPort {
minPort = ports[i].Port
}
if ports[i].Port > maxPort {
maxPort = ports[i].Port
}
}
return fmt.Sprintf("%d-%d", minPort, maxPort), nil
}
// translate from what K8s supports to what the cloud provider supports for session affinity.
func translateAffinityType(affinityType v1.ServiceAffinity) string {
switch affinityType {
case v1.ServiceAffinityClientIP:
return gceAffinityTypeClientIP
case v1.ServiceAffinityNone:
return gceAffinityTypeNone
default:
glog.Errorf("Unexpected affinity type: %v", affinityType)
return gceAffinityTypeNone
}
}
func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []v1.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) {
if gce.OnXPN() {
glog.V(2).Infoln("firewallNeedsUpdate: Cluster is on XPN network - skipping firewall creation")
return false, false, nil
}
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's firewall: %v", err)
}
if fw.Description != makeFirewallDescription(serviceName, ipAddress) {
return true, true, nil
}
if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") {
return true, true, nil
}
// Make sure the allowed ports match.
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port))
}
if !equalStringSets(allowedPorts, fw.Allowed[0].Ports) {
return true, true, nil
}
// The service controller already verified that the protocol matches on all ports, no need to check.
actualSourceRanges, err := netsets.ParseIPNets(fw.SourceRanges...)
if err != nil {
// This really shouldn't happen... GCE has returned something unexpected
glog.Warningf("Error parsing firewall SourceRanges: %v", fw.SourceRanges)
// We don't return the error, because we can hopefully recover from this by reconfiguring the firewall
return true, true, nil
}
if !sourceRanges.Equal(actualSourceRanges) {
return true, true, nil
}
return true, false, nil
}
func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, region string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error {
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return fmt.Errorf("error getting cluster ID: %v", err)
}
// Prepare the firewall params for creating / checking.
desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID)
if !isNodesHealthCheck {
desc = makeFirewallDescription(serviceName, ipAddress)
}
sourceRanges := lbSrcRngsFlag.ipn
ports := []v1.ServicePort{{Protocol: "tcp", Port: hcPort}}
fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck)
fw, err := gce.service.Firewalls.Get(gce.projectID, fwName).Do()
if err != nil {
if !isHTTPErrorCode(err, http.StatusNotFound) {
return fmt.Errorf("error getting firewall for health checks: %v", err)
}
glog.Infof("Creating firewall %v for health checks.", fwName)
if err := gce.createFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
return err
}
glog.Infof("Created firewall %v for health checks.", fwName)
return nil
}
// Validate firewall fields.
if fw.Description != desc ||
len(fw.Allowed) != 1 ||
fw.Allowed[0].IPProtocol != string(ports[0].Protocol) ||
!equalStringSets(fw.Allowed[0].Ports, []string{string(ports[0].Port)}) ||
!equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) {
glog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName)
if err := gce.updateFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
glog.Warningf("Failed to reconcile firewall %v parameters.", fwName)
return err
}
glog.V(4).Infof("Corrected firewall %v parameters successful", fwName)
}
return nil
}
func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []v1.ServicePort) error {
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return err
}
req := &compute.ForwardingRule{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
IPAddress: ipAddress,
IPProtocol: string(ports[0].Protocol),
PortRange: portRange,
Target: gce.targetPoolURL(name, region),
}
if err = gce.CreateRegionForwardingRule(req, region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
return nil
}
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
}
if err = gce.CreateFirewall(firewall); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
return nil
}
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
}
if err = gce.UpdateFirewall(firewall); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
return nil
}
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port))
}
// If the node tags to be used for this cluster have been predefined in the
// provider config, just use them. Otherwise, invoke computeHostTags method to get the tags.
hostTags := gce.nodeTags
if len(hostTags) == 0 {
var err error
if hostTags, err = gce.computeHostTags(hosts); err != nil {
return nil, fmt.Errorf("No node tags supplied and also failed to parse the given lists of hosts for tags. Abort creating firewall rule.")
}
}
firewall := &compute.Firewall{
Name: name,
Description: desc,
Network: gce.networkURL,
SourceRanges: sourceRanges.StringSlice(),
TargetTags: hostTags,
Allowed: []*compute.FirewallAllowed{
{
// TODO: Make this more generic. Currently this method is only
// used to create firewall rules for loadbalancers, which have
// exactly one protocol, so we can never end up with a list of
// mixed TCP and UDP ports. It should be possible to use a
// single firewall rule for both a TCP and UDP lb.
IPProtocol: strings.ToLower(string(ports[0].Protocol)),
Ports: allowedPorts,
},
},
}
return firewall, nil
}
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Addresses.List(gce.projectID, region)
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
addresses, err := listCall.Do()
if err != nil {
return false, fmt.Errorf("failed to list gce IP addresses: %v", err)
}
pageToken = addresses.NextPageToken
for _, addr := range addresses.Items {
if addr.Address == ipAddress {
// This project does own the address, so return success.
return true, nil
}
}
}
if page >= maxPages {
glog.Errorf("projectOwnsStaticIP exceeded maxPages=%d for Addresses.List; truncating.", maxPages)
}
return false, nil
}
func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string) (ipAddress string, created bool, err error) {
// If the address doesn't exist, this will create it.
// If the existingIP exists but is ephemeral, this will promote it to static.
// If the address already exists, this will harmlessly return a StatusConflict
// and we'll grab the IP before returning.
existed := false
addressObj := &compute.Address{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
}
if existingIP != "" {
addressObj.Address = existingIP
}
address, err := gce.ReserveRegionAddress(addressObj, region)
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", false, fmt.Errorf("error creating gce static IP address: %v", err)
}
// StatusConflict == the IP exists already.
existed = true
}
return address.Address, existed, nil
}

View File

@ -0,0 +1,636 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"fmt"
"strconv"
"strings"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1"
v1_service "k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/cloudprovider"
)
const (
allInstances = "ALL"
)
type lbBalancingMode string
func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
nm := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
ports, protocol := getPortsAndProtocol(svc.Spec.Ports)
scheme := schemeInternal
loadBalancerName := cloudprovider.GetLoadBalancerName(svc)
shared := !v1_service.RequestsOnlyLocalTraffic(svc)
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shared, scheme, protocol, svc.Spec.SessionAffinity)
backendServiceLink := gce.getBackendServiceLink(backendServiceName)
// Ensure instance groups exist and nodes are assigned to groups
igName := makeInstanceGroupName(clusterID)
igLinks, err := gce.ensureInternalInstanceGroups(igName, nodes)
if err != nil {
return nil, err
}
// Lock the sharedResourceLock to prevent any deletions of shared resources while assembling shared resources here
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
// Ensure health check for backend service is created
// By default, use the node health check endpoint
hcName := makeHealthCheckName(loadBalancerName, clusterID, shared)
hcPath, hcPort := GetNodesHealthCheckPath(), GetNodesHealthCheckPort()
if !shared {
// Service requires a special health check, retrieve the OnlyLocal port & path
hcPath, hcPort = v1_service.GetServiceHealthCheckPathPort(svc)
}
hc, err := gce.ensureInternalHealthCheck(hcName, nm, shared, hcPath, hcPort)
if err != nil {
return nil, err
}
// Ensure firewall rules if necessary
if gce.OnXPN() {
glog.V(2).Infof("ensureInternalLoadBalancer: cluster is on a cross-project network (XPN) network project %v, compute project %v - skipping firewall creation", gce.networkProjectID, gce.projectID)
} else {
if err = gce.ensureInternalFirewalls(loadBalancerName, clusterID, nm, svc, strconv.Itoa(int(hcPort)), shared, nodes); err != nil {
return nil, err
}
}
expectedFwdRule := &compute.ForwardingRule{
Name: loadBalancerName,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, nm.String()),
IPAddress: svc.Spec.LoadBalancerIP,
BackendService: backendServiceLink,
Ports: ports,
IPProtocol: string(protocol),
LoadBalancingScheme: string(scheme),
}
// Specify subnetwork if network type is manual
if len(gce.subnetworkURL) > 0 {
expectedFwdRule.Subnetwork = gce.subnetworkURL
} else {
expectedFwdRule.Network = gce.networkURL
}
// Delete the previous internal load balancer if necessary
fwdRuleDeleted, err := gce.clearExistingInternalLB(loadBalancerName, existingFwdRule, expectedFwdRule, backendServiceName)
if err != nil {
return nil, err
}
bsDescription := makeBackendServiceDescription(nm, shared)
err = gce.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink)
if err != nil {
return nil, err
}
// If we previously deleted the forwarding rule or it never existed, finally create it.
if fwdRuleDeleted || existingFwdRule == nil {
glog.V(2).Infof("ensureInternalLoadBalancer(%v(%v)): creating forwarding rule", loadBalancerName, svc.Name)
if err = gce.CreateRegionForwardingRule(expectedFwdRule, gce.region); err != nil {
return nil, err
}
}
// Get the most recent forwarding rule for the new address.
existingFwdRule, err = gce.GetRegionForwardingRule(loadBalancerName, gce.region)
if err != nil {
return nil, err
}
status := &v1.LoadBalancerStatus{}
status.Ingress = []v1.LoadBalancerIngress{{IP: existingFwdRule.IPAddress}}
return status, nil
}
func (gce *GCECloud) clearExistingInternalLB(loadBalancerName string, existingFwdRule, expectedFwdRule *compute.ForwardingRule, expectedBSName string) (fwdRuleDeleted bool, err error) {
if existingFwdRule == nil {
return false, nil
}
if !fwdRuleEqual(existingFwdRule, expectedFwdRule) {
glog.V(2).Infof("clearExistingInternalLB(%v: deleting existing forwarding rule with IP address %v", loadBalancerName, existingFwdRule.IPAddress)
if err = gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) {
return false, err
}
fwdRuleDeleted = true
}
existingBSName := getNameFromLink(existingFwdRule.BackendService)
bs, err := gce.GetRegionBackendService(existingBSName, gce.region)
if err == nil {
if bs.Name != expectedBSName {
glog.V(2).Infof("clearExistingInternalLB(%v): expected backend service %q does not match actual %q - deleting backend service & healthcheck & firewall", loadBalancerName, expectedBSName, bs.Name)
// Delete the backend service as well in case it's switching between shared, nonshared, tcp, udp.
var existingHCName string
if len(bs.HealthChecks) == 1 {
existingHCName = getNameFromLink(bs.HealthChecks[0])
}
if err = gce.teardownInternalBackendResources(existingBSName, existingHCName); err != nil {
glog.Warningf("clearExistingInternalLB: could not delete old resources: %v", err)
} else {
glog.V(2).Infof("clearExistingInternalLB: done deleting old resources")
}
}
} else {
glog.Warningf("clearExistingInternalLB(%v): failed to retrieve existing backend service %v", loadBalancerName, existingBSName)
}
return fwdRuleDeleted, nil
}
func (gce *GCECloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v1.Service, nodes []*v1.Node) error {
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
igName := makeInstanceGroupName(clusterID)
igLinks, err := gce.ensureInternalInstanceGroups(igName, nodes)
if err != nil {
return err
}
// Generate the backend service name
_, protocol := getPortsAndProtocol(svc.Spec.Ports)
scheme := schemeInternal
loadBalancerName := cloudprovider.GetLoadBalancerName(svc)
shared := !v1_service.RequestsOnlyLocalTraffic(svc)
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shared, scheme, protocol, svc.Spec.SessionAffinity)
// Ensure the backend service has the proper backend instance-group links
return gce.ensureInternalBackendServiceGroups(backendServiceName, igLinks)
}
func (gce *GCECloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, svc *v1.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(svc)
_, protocol := getPortsAndProtocol(svc.Spec.Ports)
scheme := schemeInternal
shared := !v1_service.RequestsOnlyLocalTraffic(svc)
var err error
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule", loadBalancerName)
if err = gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) {
return err
}
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shared, scheme, protocol, svc.Spec.SessionAffinity)
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName)
if err = gce.DeleteRegionBackendService(backendServiceName, gce.region); err != nil && !isNotFoundOrInUse(err) {
return err
}
// Only delete the health check & health check firewall if they aren't being used by another LB. If we get
// an ResourceInuseBy error, then we can skip deleting the firewall.
hcInUse := false
hcName := makeHealthCheckName(loadBalancerName, clusterID, shared)
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting health check %v", loadBalancerName, hcName)
if err = gce.DeleteHealthCheck(hcName); err != nil && !isNotFoundOrInUse(err) {
return err
} else if isInUsedByError(err) {
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): healthcheck %v still in use", loadBalancerName, hcName)
hcInUse = true
}
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting firewall for traffic", loadBalancerName)
if err = gce.DeleteFirewall(loadBalancerName); err != nil {
return err
}
if hcInUse {
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): skipping firewall for healthcheck", loadBalancerName)
} else {
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting firewall for healthcheck", loadBalancerName)
fwHCName := makeHealthCheckFirewallkName(loadBalancerName, clusterID, shared)
if err = gce.DeleteFirewall(fwHCName); err != nil && !isInUsedByError(err) {
return err
}
}
// Try deleting instance groups - expect ResourceInuse error if needed by other LBs
igName := makeInstanceGroupName(clusterID)
if err = gce.ensureInternalInstanceGroupsDeleted(igName); err != nil && !isInUsedByError(err) {
return err
}
return nil
}
func (gce *GCECloud) teardownInternalBackendResources(bsName, hcName string) error {
if err := gce.DeleteRegionBackendService(bsName, gce.region); err != nil {
if isNotFound(err) {
glog.V(2).Infof("backend service already deleted: %v, err: %v", bsName, err)
} else if err != nil && isInUsedByError(err) {
glog.V(2).Infof("backend service in use: %v, err: %v", bsName, err)
} else {
return fmt.Errorf("failed to delete backend service: %v, err: %v", bsName, err)
}
}
if hcName == "" {
return nil
}
hcInUse := false
if err := gce.DeleteHealthCheck(hcName); err != nil {
if isNotFound(err) {
glog.V(2).Infof("health check already deleted: %v, err: %v", hcName, err)
} else if err != nil && isInUsedByError(err) {
hcInUse = true
glog.V(2).Infof("health check in use: %v, err: %v", hcName, err)
} else {
return fmt.Errorf("failed to delete health check: %v, err: %v", hcName, err)
}
}
hcFirewallName := makeHealthCheckFirewallkNameFromHC(hcName)
if hcInUse {
glog.V(2).Infof("skipping deletion of health check firewall: %v", hcFirewallName)
return nil
}
if err := gce.DeleteFirewall(hcFirewallName); err != nil && !isNotFound(err) {
return fmt.Errorf("failed to delete health check firewall: %v, err: %v", hcFirewallName, err)
}
return nil
}
func (gce *GCECloud) ensureInternalFirewall(fwName, fwDesc string, sourceRanges []string, ports []string, protocol v1.Protocol, nodes []*v1.Node) error {
glog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName)
targetTags, err := gce.GetNodeTags(nodeNames(nodes))
if err != nil {
return err
}
existingFirewall, err := gce.GetFirewall(fwName)
if err != nil && !isNotFound(err) {
return err
}
expectedFirewall := &compute.Firewall{
Name: fwName,
Description: fwDesc,
Network: gce.networkURL,
SourceRanges: sourceRanges,
TargetTags: targetTags,
Allowed: []*compute.FirewallAllowed{
{
IPProtocol: strings.ToLower(string(protocol)),
Ports: ports,
},
},
}
if existingFirewall == nil {
glog.V(2).Infof("ensureInternalFirewall(%v): creating firewall", fwName)
return gce.CreateFirewall(expectedFirewall)
}
if firewallRuleEqual(expectedFirewall, existingFirewall) {
return nil
}
glog.V(2).Infof("ensureInternalFirewall(%v): updating firewall", fwName)
return gce.UpdateFirewall(expectedFirewall)
}
func (gce *GCECloud) ensureInternalFirewalls(loadBalancerName, clusterID string, nm types.NamespacedName, svc *v1.Service, healthCheckPort string, shared bool, nodes []*v1.Node) error {
// First firewall is for ingress traffic
fwDesc := makeFirewallDescription(nm.String(), svc.Spec.LoadBalancerIP)
ports, protocol := getPortsAndProtocol(svc.Spec.Ports)
sourceRanges, err := v1_service.GetLoadBalancerSourceRanges(svc)
if err != nil {
return err
}
err = gce.ensureInternalFirewall(loadBalancerName, fwDesc, sourceRanges.StringSlice(), ports, protocol, nodes)
if err != nil {
return err
}
// Second firewall is for health checking nodes / services
fwHCName := makeHealthCheckFirewallkName(loadBalancerName, clusterID, shared)
hcSrcRanges := LoadBalancerSrcRanges()
return gce.ensureInternalFirewall(fwHCName, "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes)
}
func (gce *GCECloud) ensureInternalHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) (*compute.HealthCheck, error) {
glog.V(2).Infof("ensureInternalHealthCheck(%v, %v, %v): checking existing health check", name, path, port)
expectedHC := newInternalLBHealthCheck(name, svcName, shared, path, port)
hc, err := gce.GetHealthCheck(name)
if err != nil && !isNotFound(err) {
return nil, err
}
if hc == nil {
glog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path)
if err = gce.CreateHealthCheck(expectedHC); err != nil {
return nil, err
}
hc, err = gce.GetHealthCheck(name)
if err != nil {
glog.Errorf("Failed to get http health check %v", err)
return nil, err
}
glog.V(2).Infof("ensureInternalHealthCheck: created health check %v", name)
return hc, nil
}
if healthChecksEqual(expectedHC, hc) {
return hc, nil
}
glog.V(2).Infof("ensureInternalHealthCheck: health check %v exists but parameters have drifted - updating...", name)
if err := gce.UpdateHealthCheck(expectedHC); err != nil {
glog.Warningf("Failed to reconcile http health check %v parameters", name)
return nil, err
}
glog.V(2).Infof("ensureInternalHealthCheck: corrected health check %v parameters successful", name)
return hc, nil
}
func (gce *GCECloud) ensureInternalInstanceGroup(name, zone string, nodes []*v1.Node) (string, error) {
glog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): checking group that it contains %v nodes", name, zone, len(nodes))
ig, err := gce.GetInstanceGroup(name, zone)
if err != nil && !isNotFound(err) {
return "", err
}
kubeNodes := sets.NewString()
for _, n := range nodes {
kubeNodes.Insert(n.Name)
}
gceNodes := sets.NewString()
if ig == nil {
glog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): creating instance group", name, zone)
ig, err = gce.CreateInstanceGroup(name, zone)
if err != nil {
return "", err
}
} else {
instances, err := gce.ListInstancesInInstanceGroup(name, zone, allInstances)
if err != nil {
return "", err
}
for _, ins := range instances.Items {
parts := strings.Split(ins.Instance, "/")
gceNodes.Insert(parts[len(parts)-1])
}
}
removeNodes := gceNodes.Difference(kubeNodes).List()
addNodes := kubeNodes.Difference(gceNodes).List()
if len(removeNodes) != 0 {
glog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): removing nodes: %v", name, zone, removeNodes)
instanceRefs := gce.ToInstanceReferences(zone, removeNodes)
// Possible we'll receive 404's here if the instance was deleted before getting to this point.
if err = gce.RemoveInstancesFromInstanceGroup(name, zone, instanceRefs); err != nil && !isNotFound(err) {
return "", err
}
}
if len(addNodes) != 0 {
glog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): adding nodes: %v", name, zone, addNodes)
instanceRefs := gce.ToInstanceReferences(zone, addNodes)
if err = gce.AddInstancesToInstanceGroup(name, zone, instanceRefs); err != nil {
return "", err
}
}
return ig.SelfLink, nil
}
// ensureInternalInstanceGroups generates an unmanaged instance group for every zone
// where a K8s node exists. It also ensures that each node belongs to an instance group
func (gce *GCECloud) ensureInternalInstanceGroups(name string, nodes []*v1.Node) ([]string, error) {
zonedNodes := splitNodesByZone(nodes)
glog.V(2).Infof("ensureInternalInstanceGroups(%v): %d nodes over %d zones in region %v", name, len(nodes), len(zonedNodes), gce.region)
var igLinks []string
for zone, nodes := range zonedNodes {
igLink, err := gce.ensureInternalInstanceGroup(name, zone, nodes)
if err != nil {
return []string{}, err
}
igLinks = append(igLinks, igLink)
}
return igLinks, nil
}
func (gce *GCECloud) ensureInternalInstanceGroupsDeleted(name string) error {
// List of nodes isn't available here - fetch all zones in region and try deleting this cluster's ig
zones, err := gce.ListZonesInRegion(gce.region)
if err != nil {
return err
}
glog.V(2).Infof("ensureInternalInstanceGroupsDeleted(%v): deleting instance group in all %d zones", name, len(zones))
for _, z := range zones {
if err := gce.DeleteInstanceGroup(name, z.Name); err != nil && !isNotFound(err) {
return err
}
}
return nil
}
func (gce *GCECloud) ensureInternalBackendService(name, description string, affinityType v1.ServiceAffinity, scheme lbScheme, protocol v1.Protocol, igLinks []string, hcLink string) error {
glog.V(2).Infof("ensureInternalBackendService(%v, %v, %v): checking existing backend service with %d groups", name, scheme, protocol, len(igLinks))
bs, err := gce.GetRegionBackendService(name, gce.region)
if err != nil && !isNotFound(err) {
return err
}
backends := backendsFromGroupLinks(igLinks)
expectedBS := &compute.BackendService{
Name: name,
Protocol: string(protocol),
Description: description,
HealthChecks: []string{hcLink},
Backends: backends,
SessionAffinity: translateAffinityType(affinityType),
LoadBalancingScheme: string(scheme),
}
// Create backend service if none was found
if bs == nil {
glog.V(2).Infof("ensureInternalBackendService: creating backend service %v", name)
err := gce.CreateRegionBackendService(expectedBS, gce.region)
if err != nil {
return err
}
glog.V(2).Infof("ensureInternalBackendService: created backend service %v successfully", name)
return nil
}
// Check existing backend service
existingIGLinks := sets.NewString()
for _, be := range bs.Backends {
existingIGLinks.Insert(be.Group)
}
if backendSvcEqual(expectedBS, bs) {
return nil
}
glog.V(2).Infof("ensureInternalBackendService: updating backend service %v", name)
if err := gce.UpdateRegionBackendService(expectedBS, gce.region); err != nil {
return err
}
glog.V(2).Infof("ensureInternalBackendService: updated backend service %v successfully", name)
return nil
}
func (gce *GCECloud) ensureInternalBackendServiceGroups(name string, igLinks []string) error {
glog.V(2).Infof("ensureInternalBackendServiceGroups(%v): checking existing backend service's groups", name)
bs, err := gce.GetRegionBackendService(name, gce.region)
if err != nil {
return err
}
backends := backendsFromGroupLinks(igLinks)
if backendsListEqual(bs.Backends, backends) {
return nil
}
glog.V(2).Infof("ensureInternalBackendServiceGroups: updating backend service %v", name)
bs.Backends = backends
if err := gce.UpdateRegionBackendService(bs, gce.region); err != nil {
return err
}
glog.V(2).Infof("ensureInternalBackendServiceGroups: updated backend service %v successfully", name)
return nil
}
func backendsFromGroupLinks(igLinks []string) []*compute.Backend {
var backends []*compute.Backend
for _, igLink := range igLinks {
backends = append(backends, &compute.Backend{
Group: igLink,
})
}
return backends
}
func newInternalLBHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) *compute.HealthCheck {
httpSettings := compute.HTTPHealthCheck{
Port: int64(port),
RequestPath: path,
}
desc := ""
if !shared {
desc = makeHealthCheckDescription(svcName.String())
}
return &compute.HealthCheck{
Name: name,
CheckIntervalSec: gceHcCheckIntervalSeconds,
TimeoutSec: gceHcTimeoutSeconds,
HealthyThreshold: gceHcHealthyThreshold,
UnhealthyThreshold: gceHcUnhealthyThreshold,
HttpHealthCheck: &httpSettings,
Type: "HTTP",
Description: desc,
}
}
func firewallRuleEqual(a, b *compute.Firewall) bool {
return a.Description == b.Description &&
len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) &&
a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol &&
equalStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) &&
equalStringSets(a.SourceRanges, b.SourceRanges) &&
equalStringSets(a.TargetTags, b.TargetTags)
}
func healthChecksEqual(a, b *compute.HealthCheck) bool {
return a.HttpHealthCheck != nil && b.HttpHealthCheck != nil &&
a.HttpHealthCheck.Port == b.HttpHealthCheck.Port &&
a.HttpHealthCheck.RequestPath == b.HttpHealthCheck.RequestPath &&
a.Description == b.Description &&
a.CheckIntervalSec == b.CheckIntervalSec &&
a.TimeoutSec == b.TimeoutSec &&
a.UnhealthyThreshold == b.UnhealthyThreshold &&
a.HealthyThreshold == b.HealthyThreshold
}
// backendsListEqual asserts that backend lists are equal by instance group link only
func backendsListEqual(a, b []*compute.Backend) bool {
if len(a) != len(b) {
return false
}
if len(a) == 0 {
return true
}
aSet := sets.NewString()
for _, v := range a {
aSet.Insert(v.Group)
}
bSet := sets.NewString()
for _, v := range b {
bSet.Insert(v.Group)
}
return aSet.Equal(bSet)
}
func backendSvcEqual(a, b *compute.BackendService) bool {
return a.Protocol == b.Protocol &&
a.Description == b.Description &&
a.SessionAffinity == b.SessionAffinity &&
a.LoadBalancingScheme == b.LoadBalancingScheme &&
equalStringSets(a.HealthChecks, b.HealthChecks) &&
backendsListEqual(a.Backends, b.Backends)
}
func fwdRuleEqual(a, b *compute.ForwardingRule) bool {
return (a.IPAddress == "" || b.IPAddress == "" || a.IPAddress == b.IPAddress) &&
a.IPProtocol == b.IPProtocol &&
a.LoadBalancingScheme == b.LoadBalancingScheme &&
equalStringSets(a.Ports, b.Ports) &&
a.BackendService == b.BackendService
}
func getPortsAndProtocol(svcPorts []v1.ServicePort) (ports []string, protocol v1.Protocol) {
if len(svcPorts) == 0 {
return []string{}, v1.ProtocolUDP
}
// GCP doesn't support multiple protocols for a single load balancer
protocol = svcPorts[0].Protocol
for _, p := range svcPorts {
ports = append(ports, strconv.Itoa(int(p.Port)))
}
return ports, protocol
}
func (gce *GCECloud) getBackendServiceLink(name string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/backendServices/%s", gce.projectID, gce.region, name)
}
func getNameFromLink(link string) string {
fields := strings.Split(link, "/")
return fields[len(fields)-1]
}

View File

@ -0,0 +1,103 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"fmt"
"strings"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
)
// Internal Load Balancer
// Instance groups remain legacy named to stay consistent with ingress
func makeInstanceGroupName(clusterID string) string {
return fmt.Sprintf("k8s-ig--%s", clusterID)
}
func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, scheme lbScheme, protocol v1.Protocol, svcAffinity v1.ServiceAffinity) string {
if shared {
affinity := ""
switch svcAffinity {
case v1.ServiceAffinityClientIP:
affinity = "clientip"
default:
affinity = "noaffinity"
}
return fmt.Sprintf("k8s-%s-%s-%s-%s", clusterID, strings.ToLower(string(scheme)), strings.ToLower(string(protocol)), affinity)
}
return loadBalancerName
}
func makeHealthCheckName(loadBalancerName, clusterID string, shared bool) string {
if shared {
return fmt.Sprintf("k8s-%s-node", clusterID)
}
return loadBalancerName
}
func makeHealthCheckFirewallkNameFromHC(healthCheckName string) string {
return healthCheckName + "-hc"
}
func makeHealthCheckFirewallkName(loadBalancerName, clusterID string, shared bool) string {
if shared {
return fmt.Sprintf("k8s-%s-node-hc", clusterID)
}
return loadBalancerName + "-hc"
}
func makeBackendServiceDescription(nm types.NamespacedName, shared bool) string {
if shared {
return ""
}
return fmt.Sprintf(`{"kubernetes.io/service-name":"%s"`, nm.String())
}
// External Load Balancer
// makeNodesHealthCheckName returns name of the health check resource used by
// the GCE load balancers (l4) for performing health checks on nodes.
func makeNodesHealthCheckName(clusterID string) string {
return fmt.Sprintf("k8s-%v-node", clusterID)
}
func makeHealthCheckDescription(serviceName string) string {
return fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName)
}
// MakeHealthCheckFirewallName returns the firewall name used by the GCE load
// balancers (l4) for performing health checks.
func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string {
if isNodesHealthCheck {
return makeNodesHealthCheckName(clusterID) + "-http-hc"
}
return "k8s-" + hcName + "-http-hc"
}
func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name)
}
func makeFirewallDescription(serviceName, ipAddress string) string {
return fmt.Sprintf(`{"kubernetes.io/service-name":"%s", "kubernetes.io/service-ip":"%s"}`,
serviceName, ipAddress)
}

View File

@ -1,66 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"time"
compute "google.golang.org/api/compute/v1"
)
func newStaticIPMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"staticip_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// ReserveGlobalStaticIP creates a global static IP.
// Caller is allocated a random IP if they do not specify an ipAddress. If an
// ipAddress is specified, it must belong to the current project, eg: an
// ephemeral IP associated with a global forwarding rule.
func (gce *GCECloud) ReserveGlobalStaticIP(name, ipAddress string) (address *compute.Address, err error) {
mc := newStaticIPMetricContext("reserve")
op, err := gce.service.GlobalAddresses.Insert(gce.projectID, &compute.Address{Name: name, Address: ipAddress}).Do()
if err != nil {
return nil, mc.Observe(err)
}
if err := gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.service.GlobalAddresses.Get(gce.projectID, name).Do()
}
// DeleteGlobalStaticIP deletes a global static IP by name.
func (gce *GCECloud) DeleteGlobalStaticIP(name string) error {
mc := newStaticIPMetricContext("delete")
op, err := gce.service.GlobalAddresses.Delete(gce.projectID, name).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForGlobalOp(op, mc)
}
// GetGlobalStaticIP returns the global static IP by name.
func (gce *GCECloud) GetGlobalStaticIP(name string) (*compute.Address, error) {
mc := newStaticIPMetricContext("get")
v, err := gce.service.GlobalAddresses.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"time"
compute "google.golang.org/api/compute/v1"
)
func newTargetPoolMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"targetpool_" + request, region, unusedMetricLabel},
}
}
// GetTargetPool returns the TargetPool by name.
func (gce *GCECloud) GetTargetPool(name, region string) (*compute.TargetPool, error) {
mc := newTargetPoolMetricContext("get", region)
v, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
return v, mc.Observe(err)
}
// CreateTargetPool creates the passed TargetPool
func (gce *GCECloud) CreateTargetPool(tp *compute.TargetPool, region string) (*compute.TargetPool, error) {
mc := newTargetPoolMetricContext("create", region)
op, err := gce.service.TargetPools.Insert(gce.projectID, region, tp).Do()
if err != nil {
return nil, mc.Observe(err)
}
if err := gce.waitForRegionOp(op, region, mc); err != nil {
return nil, err
}
return gce.GetTargetPool(tp.Name, region)
}
// DeleteTargetPool deletes TargetPool by name.
func (gce *GCECloud) DeleteTargetPool(name, region string) error {
mc := newTargetPoolMetricContext("delete", region)
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, region, mc)
}
// AddInstancesToTargetPool adds instances by link to the TargetPool
func (gce *GCECloud) AddInstancesToTargetPool(name, region string, instanceRefs []*compute.InstanceReference) error {
add := &compute.TargetPoolsAddInstanceRequest{Instances: instanceRefs}
mc := newTargetPoolMetricContext("add_instances", region)
op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, region, mc)
}
// RemoveInstancesToTargetPool removes instances by link to the TargetPool
func (gce *GCECloud) RemoveInstancesFromTargetPool(name, region string, instanceRefs []*compute.InstanceReference) error {
remove := &compute.TargetPoolsRemoveInstanceRequest{Instances: instanceRefs}
mc := newTargetPoolMetricContext("remove_instances", region)
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, remove).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, region, mc)
}

View File

@ -134,3 +134,18 @@ func equalStringSets(x, y []string) bool {
yString := sets.NewString(y...)
return xString.Equal(yString)
}
func isNotFound(err error) bool {
return isHTTPErrorCode(err, http.StatusNotFound)
}
func ignoreNotFound(err error) error {
if err == nil || isNotFound(err) {
return nil
}
return err
}
func isNotFoundOrInUse(err error) bool {
return isNotFound(err) || isInUsedByError(err)
}

View File

@ -16,11 +16,41 @@ limitations under the License.
package gce
import "k8s.io/kubernetes/pkg/cloudprovider"
import (
"fmt"
"time"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
)
func newZonesMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"zones_" + request, region, unusedMetricLabel},
}
}
// GetZone creates a cloudprovider.Zone of the current zone and region
func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
return cloudprovider.Zone{
FailureDomain: gce.localZone,
Region: gce.region,
}, nil
}
// ListZonesInRegion returns all zones in a GCP region
func (gce *GCECloud) ListZonesInRegion(region string) ([]*compute.Zone, error) {
mc := newZonesMetricContext("list", region)
filter := fmt.Sprintf("region eq %v", gce.getRegionLink(region))
list, err := gce.service.Zones.List(gce.projectID).Filter(filter).Do()
if err != nil {
return nil, mc.Observe(err)
}
return list.Items, mc.Observe(err)
}
func (gce *GCECloud) getRegionLink(region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%v/regions/%v", gce.projectID, region)
}

View File

@ -717,9 +717,10 @@ func (cont *GCEIngressController) Init() {
// invoking deleteStaticIPs.
func (cont *GCEIngressController) CreateStaticIP(name string) string {
gceCloud := cont.Cloud.Provider.(*gcecloud.GCECloud)
ip, err := gceCloud.ReserveGlobalStaticIP(name, "")
addr := &compute.Address{Name: name}
ip, err := gceCloud.ReserveGlobalAddress(addr)
if err != nil {
if delErr := gceCloud.DeleteGlobalStaticIP(name); delErr != nil {
if delErr := gceCloud.DeleteGlobalAddress(name); delErr != nil {
if cont.isHTTPErrorCode(delErr, http.StatusNotFound) {
Logf("Static ip with name %v was not allocated, nothing to delete", name)
} else {

View File

@ -5167,20 +5167,15 @@ func CleanupGCEResources(loadBalancerName string) (retErr error) {
!IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) {
retErr = err
}
if err := gceCloud.DeleteForwardingRule(loadBalancerName); err != nil &&
if err := gceCloud.DeleteRegionForwardingRule(loadBalancerName, gceCloud.Region()); err != nil &&
!IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) {
retErr = fmt.Errorf("%v\n%v", retErr, err)
}
if err := gceCloud.DeleteGlobalStaticIP(loadBalancerName); err != nil &&
if err := gceCloud.DeleteRegionAddress(loadBalancerName, gceCloud.Region()); err != nil &&
!IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) {
retErr = fmt.Errorf("%v\n%v", retErr, err)
}
// This function shells out to gcloud, so we can't compare for NotFound errors.
// TODO: Invoke cloudprovider method directly instead.
if err := DeleteGCEStaticIP(loadBalancerName); err != nil {
Logf("%v", err)
}
var hcNames []string
hc, getErr := gceCloud.GetHttpHealthCheck(loadBalancerName)
if getErr != nil && !IsGoogleAPIHTTPErrorCode(getErr, http.StatusNotFound) {
@ -5190,7 +5185,7 @@ func CleanupGCEResources(loadBalancerName string) (retErr error) {
if hc != nil {
hcNames = append(hcNames, hc.Name)
}
if err := gceCloud.DeleteTargetPool(loadBalancerName, hcNames...); err != nil &&
if err := gceCloud.DeleteExternalTargetPoolAndChecks(loadBalancerName, gceCloud.Region(), hcNames...); err != nil &&
!IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) {
retErr = fmt.Errorf("%v\n%v", retErr, err)
}