diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 3ca5708ecf5..9e5cd5c5607 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -22,6 +22,7 @@ import ( "net" "net/http" "path" + "sort" "strconv" "strings" "time" @@ -44,9 +45,16 @@ import ( const ( ProviderName = "gce" -) -const k8sNodeRouteTag = "k8s-node-route" + k8sNodeRouteTag = "k8s-node-route" + + // AffinityTypeNone - no session affinity. + gceAffinityTypeNone = "None" + // AffinityTypeClientIP - affinity based on Client IP. + gceAffinityTypeClientIP = "CLIENT_IP" + // AffinityTypeClientIPProto - affinity based on Client IP and port. + gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO" +) // GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine. type GCECloud struct { @@ -232,38 +240,6 @@ func hostURLToComparablePath(hostURL string) string { return hostURL[idx:] } -// Session Affinity Type string -type GCEAffinityType string - -const ( - // AffinityTypeNone - no session affinity. - GCEAffinityTypeNone GCEAffinityType = "None" - // AffinityTypeClientIP is the Client IP based. - GCEAffinityTypeClientIP GCEAffinityType = "CLIENT_IP" - // AffinityTypeClientIP is the Client IP based. - GCEAffinityTypeClientIPProto GCEAffinityType = "CLIENT_IP_PROTO" -) - -func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) error { - var instances []string - for _, host := range hosts { - instances = append(instances, makeHostURL(gce.projectID, gce.zone, host)) - } - pool := &compute.TargetPool{ - Name: name, - Instances: instances, - SessionAffinity: string(affinityType), - } - op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do() - if err != nil { - return err - } - if err = gce.waitForRegionOp(op, region); err != nil { - return err - } - return nil -} - func (gce *GCECloud) targetPoolURL(name, region string) string { return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) } @@ -333,114 +309,150 @@ func isHTTPErrorCode(err error, code int) bool { return ok && apiErr.Code == code } -// translate from what K8s supports to what the cloud provider supports for session affinity. -func translateAffinityType(affinityType api.ServiceAffinity) GCEAffinityType { - switch affinityType { - case api.ServiceAffinityClientIP: - return GCEAffinityTypeClientIP - case api.ServiceAffinityNone: - return GCEAffinityTypeNone - default: - glog.Errorf("Unexpected affinity type: %v", affinityType) - return GCEAffinityTypeNone - } -} - -func makeFirewallName(name string) string { - return fmt.Sprintf("k8s-fw-%s", name) -} - -func (gce *GCECloud) getAddress(name, region string) (string, bool, error) { - address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do() - if err == nil { - return address.Address, true, nil - } - if isHTTPErrorCode(err, http.StatusNotFound) { - return "", false, nil - } - return "", false, err -} - -func ownsAddress(ip net.IP, addrs []*compute.Address) bool { - ipStr := ip.String() - for _, addr := range addrs { - if addr.Address == ipStr { - return true - } - } - return false -} - // EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. -// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're -// owned by the project and available to be used, and use them if they are. -func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { +// Our load balancers in GCE consist of four separate GCE resources - a static +// IP address, a firewall rule, a target pool, and a forwarding rule. This +// function has to manage all of them. +// Due to an interesting series of design decisions, this handles both creating +// new load balancers and updating existing load balancers, recognizing when +// each is needed. +func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { if len(hosts) == 0 { return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts") } - if loadBalancerIP == nil { - glog.V(2).Info("Checking if the static IP address already exists: %s", name) - address, exists, err := gce.getAddress(name, region) - if err != nil { - return nil, fmt.Errorf("error looking for gce address: %v", err) - } - if !exists { - // Note, though static addresses that _aren't_ in use cost money, ones that _are_ in use don't. - // However, quota is limited to only 7 addresses per region by default. - op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do() - if err != nil { - return nil, fmt.Errorf("error creating gce static IP address: %v", err) - } - if err := gce.waitForRegionOp(op, region); err != nil { - return nil, fmt.Errorf("error waiting for gce static IP address to complete: %v", err) - } - address, exists, err = gce.getAddress(name, region) - if err != nil { - return nil, fmt.Errorf("error re-getting gce static IP address: %v", err) - } - if !exists { - return nil, fmt.Errorf("failed to re-get gce static IP address for %s", name) - } - } - if loadBalancerIP = net.ParseIP(address); loadBalancerIP == nil { - return nil, fmt.Errorf("error parsing gce static IP address: %s", address) - } - } else { - addresses, err := gce.service.Addresses.List(gce.projectID, region).Do() - if err != nil { - return nil, fmt.Errorf("failed to list gce IP addresses: %v", err) - } - if !ownsAddress(loadBalancerIP, addresses.Items) { - return nil, fmt.Errorf("this gce project don't own the IP address: %s", loadBalancerIP.String()) - } - } - - glog.V(2).Info("Checking if load balancer already exists: %s", name) - _, exists, err := gce.GetTCPLoadBalancer(name, region) + // Check if the forwarding rule exists, and if so, what its IP is. + fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(name, region, requestedIP, ports) if err != nil { - return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err) + return nil, err } - // TODO: Implement a more efficient update strategy for common changes than delete & create - // In particular, if we implement hosts update, we can get rid of UpdateHosts - if exists { - err := gce.EnsureTCPLoadBalancerDeleted(name, region) - if err != nil { - return nil, fmt.Errorf("error deleting existing GCE load balancer: %v", err) + // If a specific IP address has been requested, we have to respect the user's + // request and use that IP. If the forwarding rule was already using a + // different IP, then we have to make sure to delete it once it's no longer + // attached to the forwarding rule to avoid leaking it. + ipAddress := fwdRuleIP + deleteFwdRuleIP := false + if requestedIP != nil && requestedIP.String() != fwdRuleIP { + ipAddress = requestedIP.String() + deleteFwdRuleIP = true + } + + // Make sure we have an IP address to use if we weren't able to pull it from + // an existing forwarding rule. Note that we absolutely do not ever delete an + // IP address in this function -- we always reuse the old one if possible. + // + // We use static IP addresses to ensure that we can replace a load balancer's + // other components without changing the address a service is reachable on. + // Note that while static addresses that _aren't_ in use cost the user money, + // addresses that _are_ in use cost nothing. + // However, quota is limited to only 7 addresses per region by default. + if ipAddress == "" { + if requestedIP != nil { + if err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil { + return nil, err + } + ipAddress = requestedIP.String() + } else { + ipAddress, err = gce.createStaticIP(name, region) + if err != nil { + return nil, err + } } } - err = gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) + // Deal with the firewall next. The reason we do this here rather than last + // is because the forwarding rule is used as the indicator that the load + // balancer is fully created - it's what getTCPLoadBalancer checks for. + firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports) if err != nil { - if !isHTTPErrorCode(err, http.StatusConflict) { - return nil, err - } - glog.Infof("Creating forwarding rule pointing at target pool that already exists: %v", err) + return nil, err } + if firewallNeedsUpdate { + // Unlike forwarding rules and target pools, firewalls can be updated + // without needing to be deleted and recreated. + if firewallExists { + if err := gce.updateFirewall(name, region, ipAddress, ports, hosts); err != nil { + return nil, err + } + } else { + if err := gce.createFirewall(name, region, ipAddress, ports, hosts); err != nil { + return nil, err + } + } + } + + tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(name, region, affinityType) + if err != nil { + return nil, err + } + + // Now we get to some slightly more interesting logic. + // First, neither target pools nor forwarding rules can be updated in place - + // they have to be deleted and recreated. + // Second, forwarding rules are layered on top of target pools in that you + // can't delete a target pool that's currently in use by a forwarding rule. + // Thus, we have to tear down the forwarding rule if either it or the target + // pool needs to be updated. + if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsUpdate) { + if err := gce.deleteForwardingRule(name, region); err != nil { + return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err) + } + if deleteFwdRuleIP { + // Delete the old IP to avoid leaking it, since we're going to be using + // the user-requested IP when recreating the forwarding rule below. + gce.deleteStaticIP(name, region) + } + } + if tpExists && tpNeedsUpdate { + if err := gce.deleteTargetPool(name, region); err != nil { + return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err) + } + } + + // Once we've deleted the resources (if necessary), build them back up (or for + // the first time if they're new). + if tpNeedsUpdate { + if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil { + return nil, fmt.Errorf("failed to create target pool %s: %v", name, err) + } + } + if tpNeedsUpdate || fwdRuleNeedsUpdate { + if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil { + return nil, fmt.Errorf("failed to create forwarding rule %s: %v", name, err) + } + } + + status := &api.LoadBalancerStatus{} + status.Ingress = []api.LoadBalancerIngress{{IP: ipAddress}} + return status, nil +} + +func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP net.IP, ports []*api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) { + fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() + if err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + return false, true, "", nil + } + return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err) + } + if requestedIP != nil && requestedIP.String() != fwd.IPAddress { + return true, true, fwd.IPAddress, nil + } + portRange, err := loadBalancerPortRange(ports) + if err != nil { + return false, false, "", err + } + if portRange != fwd.PortRange { + return true, true, fwd.IPAddress, nil + } + return true, false, fwd.IPAddress, nil +} + +func loadBalancerPortRange(ports []*api.ServicePort) (string, error) { if len(ports) == 0 { - return nil, fmt.Errorf("no ports specified for GCE load balancer") + return "", fmt.Errorf("no ports specified for GCE load balancer") } minPort := 65536 maxPort := 0 @@ -452,42 +464,185 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n maxPort = ports[i].Port } } + return fmt.Sprintf("%d-%d", minPort, maxPort), nil +} + +// Doesn't check whether the hosts have changed, since host updating is handled +// separately. +func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType api.ServiceAffinity) (exists bool, needsUpdate bool, err error) { + tp, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() + if err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + return false, true, nil + } + return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err) + } + if translateAffinityType(affinityType) != tp.SessionAffinity { + return true, true, nil + } + return true, false, nil +} + +// translate from what K8s supports to what the cloud provider supports for session affinity. +func translateAffinityType(affinityType api.ServiceAffinity) string { + switch affinityType { + case api.ServiceAffinityClientIP: + return gceAffinityTypeClientIP + case api.ServiceAffinityNone: + return gceAffinityTypeNone + default: + glog.Errorf("Unexpected affinity type: %v", affinityType) + return gceAffinityTypeNone + } +} + +func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort) (exists bool, needsUpdate bool, err error) { + fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do() + if err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + return false, true, nil + } + return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err) + } + if fw.Description != makeFirewallDescription(ipAddress) { + return true, true, nil + } + // Make sure the allowed ports match + if len(fw.Allowed) != 1 { + return true, true, nil + } + if fw.Allowed[0].IPProtocol != "tcp" { + return true, true, nil + } + allowedPorts := make([]string, len(ports)) + for ix := range ports { + allowedPorts[ix] = strconv.Itoa(ports[ix].Port) + } + if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) { + return true, true, nil + } + return true, false, nil +} + +func makeFirewallName(name string) string { + return fmt.Sprintf("k8s-fw-%s", name) +} + +func makeFirewallDescription(ipAddress string) string { + return fmt.Sprintf("KubernetesAutoGenerated_OnlyAllowTrafficForDestinationIP_%s", ipAddress) +} + +func slicesEqual(x, y []string) bool { + if len(x) != len(y) { + return false + } + sort.Strings(x) + sort.Strings(y) + for i := range x { + if x[i] != y[i] { + return false + } + } + return true +} + +func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports []*api.ServicePort) error { + portRange, err := loadBalancerPortRange(ports) + if err != nil { + return err + } req := &compute.ForwardingRule{ Name: name, - IPAddress: loadBalancerIP.String(), + IPAddress: ipAddress, IPProtocol: "TCP", - PortRange: fmt.Sprintf("%d-%d", minPort, maxPort), + PortRange: portRange, Target: gce.targetPoolURL(name, region), } op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return nil, err + return err } if op != nil { err = gce.waitForRegionOp(op, region) if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return nil, err + return err } } - fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() - if err != nil { - return nil, err - } + return nil +} +func (gce *GCECloud) createTargetPool(name, region string, hosts []string, affinityType api.ServiceAffinity) error { + var instances []string + for _, host := range hosts { + instances = append(instances, makeHostURL(gce.projectID, gce.zone, host)) + } + pool := &compute.TargetPool{ + Name: name, + Instances: instances, + SessionAffinity: translateAffinityType(affinityType), + } + op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do() + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + if op != nil { + err = gce.waitForRegionOp(op, region) + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + } + return nil +} + +func (gce *GCECloud) createFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) error { + firewall, err := gce.firewallObject(name, region, ipAddress, ports, hosts) + if err != nil { + return err + } + op, err := gce.service.Firewalls.Insert(gce.projectID, firewall).Do() + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + if op != nil { + err = gce.waitForGlobalOp(op) + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + } + return nil +} + +func (gce *GCECloud) updateFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) error { + firewall, err := gce.firewallObject(name, region, ipAddress, ports, hosts) + if err != nil { + return err + } + op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do() + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + if op != nil { + err = gce.waitForGlobalOp(op) + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return err + } + } + return nil +} + +func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) (*compute.Firewall, error) { allowedPorts := make([]string, len(ports)) for ix := range ports { allowedPorts[ix] = strconv.Itoa(ports[ix].Port) } - hostTags, err := gce.computeHostTags(hosts) if err != nil { return nil, err } - firewall := &compute.Firewall{ Name: makeFirewallName(name), - Description: fmt.Sprintf("KubernetesAutoGenerated_OnlyAllowTrafficForDestinationIP_%s", fwd.IPAddress), + Description: makeFirewallDescription(ipAddress), Network: gce.networkURL, SourceRanges: []string{"0.0.0.0/0"}, TargetTags: hostTags, @@ -498,16 +653,7 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n }, }, } - if op, err = gce.service.Firewalls.Insert(gce.projectID, firewall).Do(); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return nil, err - } - if err = gce.waitForGlobalOp(op); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return nil, err - } - - status := &api.LoadBalancerStatus{} - status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}} - return status, nil + return firewall, nil } // We grab all tags from all instances being added to the pool. @@ -550,6 +696,42 @@ func (gce *GCECloud) computeHostTags(hosts []string) ([]string, error) { return tags.List(), nil } +func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) error { + addresses, err := gce.service.Addresses.List(gce.projectID, region).Do() + if err != nil { + return fmt.Errorf("failed to list gce IP addresses: %v", err) + } + for _, addr := range addresses.Items { + if addr.Address == ipAddress { + // This project does own the address, so return success. + return nil + } + } + return fmt.Errorf("this gce project doesn't own the IP address: %s", ipAddress) +} + +func (gce *GCECloud) createStaticIP(name, region string) (ipAddress string, err error) { + // If the address already exists, this will harmlessly continue on to getting + // the address in the next section. + op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do() + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return "", fmt.Errorf("error creating gce static IP address: %v", err) + } + if op != nil { + err := gce.waitForRegionOp(op, region) + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return "", fmt.Errorf("error waiting for gce static IP address to be created: %v", err) + } + } + + // We have to get the address to know which IP was allocated for us. + address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do() + if err != nil { + return "", fmt.Errorf("error re-getting gce static IP address: %v", err) + } + return address.Address, nil +} + // UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer. func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() diff --git a/pkg/cloudprovider/providers/gce/gce_test.go b/pkg/cloudprovider/providers/gce/gce_test.go index 5bf6eb83517..0c2504c99c3 100644 --- a/pkg/cloudprovider/providers/gce/gce_test.go +++ b/pkg/cloudprovider/providers/gce/gce_test.go @@ -16,68 +16,7 @@ limitations under the License. package gce_cloud -import ( - "net" - "testing" - - compute "google.golang.org/api/compute/v1" -) - -func TestOwnsAddress(t *testing.T) { - tests := []struct { - ip net.IP - addrs []*compute.Address - expectOwn bool - }{ - { - ip: net.ParseIP("1.2.3.4"), - addrs: []*compute.Address{}, - expectOwn: false, - }, - { - ip: net.ParseIP("1.2.3.4"), - addrs: []*compute.Address{ - {Address: "2.3.4.5"}, - {Address: "2.3.4.6"}, - {Address: "2.3.4.7"}, - }, - expectOwn: false, - }, - { - ip: net.ParseIP("2.3.4.5"), - addrs: []*compute.Address{ - {Address: "2.3.4.5"}, - {Address: "2.3.4.6"}, - {Address: "2.3.4.7"}, - }, - expectOwn: true, - }, - { - ip: net.ParseIP("2.3.4.6"), - addrs: []*compute.Address{ - {Address: "2.3.4.5"}, - {Address: "2.3.4.6"}, - {Address: "2.3.4.7"}, - }, - expectOwn: true, - }, - { - ip: net.ParseIP("2.3.4.7"), - addrs: []*compute.Address{ - {Address: "2.3.4.5"}, - {Address: "2.3.4.6"}, - {Address: "2.3.4.7"}, - }, - expectOwn: true, - }, - } - for _, test := range tests { - own := ownsAddress(test.ip, test.addrs) - if own != test.expectOwn { - t.Errorf("expected: %v, got %v for %v", test.expectOwn, own, test) - } - } -} +import "testing" func TestGetRegion(t *testing.T) { gce := &GCECloud{