Correctly support updates in EnsureTCPLoadBalancer for GCE.

Previously we'd just tear everything down and recreate it, which makes
for a pretty bad experience because it causes downtime whenever the
service controller restarts and has to make sure everything is in the
desired state.

This adds more code than I'd prefer, but makes it much cleaner and more
organized than it was before, in my opinion. I didn't bother
parallelizing anything because it's complex enough as it is, right now.

It's consistently passing the existing e2es and worked when I tested
manually, but this could definitely use additional e2e tests and/or some
serious refactoring to make real unit tests feasible. I'll follow up
with one or two e2e tests that make sense (updating an LB or killing the
controller manager, perhaps).
This commit is contained in:
Alex Robinson 2015-10-07 08:20:26 +00:00
parent 7a33a4b0e9
commit 429fa7a378
2 changed files with 331 additions and 210 deletions

View File

@ -22,6 +22,7 @@ import (
"net"
"net/http"
"path"
"sort"
"strconv"
"strings"
"time"
@ -44,9 +45,16 @@ import (
const (
ProviderName = "gce"
)
const k8sNodeRouteTag = "k8s-node-route"
k8sNodeRouteTag = "k8s-node-route"
// AffinityTypeNone - no session affinity.
gceAffinityTypeNone = "None"
// AffinityTypeClientIP - affinity based on Client IP.
gceAffinityTypeClientIP = "CLIENT_IP"
// AffinityTypeClientIPProto - affinity based on Client IP and port.
gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO"
)
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
@ -232,38 +240,6 @@ func hostURLToComparablePath(hostURL string) string {
return hostURL[idx:]
}
// Session Affinity Type string
type GCEAffinityType string
const (
// AffinityTypeNone - no session affinity.
GCEAffinityTypeNone GCEAffinityType = "None"
// AffinityTypeClientIP is the Client IP based.
GCEAffinityTypeClientIP GCEAffinityType = "CLIENT_IP"
// AffinityTypeClientIP is the Client IP based.
GCEAffinityTypeClientIPProto GCEAffinityType = "CLIENT_IP_PROTO"
)
func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, gce.zone, host))
}
pool := &compute.TargetPool{
Name: name,
Instances: instances,
SessionAffinity: string(affinityType),
}
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil {
return err
}
if err = gce.waitForRegionOp(op, region); err != nil {
return err
}
return nil
}
func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
}
@ -333,114 +309,150 @@ func isHTTPErrorCode(err error, code int) bool {
return ok && apiErr.Code == code
}
// translate from what K8s supports to what the cloud provider supports for session affinity.
func translateAffinityType(affinityType api.ServiceAffinity) GCEAffinityType {
switch affinityType {
case api.ServiceAffinityClientIP:
return GCEAffinityTypeClientIP
case api.ServiceAffinityNone:
return GCEAffinityTypeNone
default:
glog.Errorf("Unexpected affinity type: %v", affinityType)
return GCEAffinityTypeNone
}
}
func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name)
}
func (gce *GCECloud) getAddress(name, region string) (string, bool, error) {
address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
if err == nil {
return address.Address, true, nil
}
if isHTTPErrorCode(err, http.StatusNotFound) {
return "", false, nil
}
return "", false, err
}
func ownsAddress(ip net.IP, addrs []*compute.Address) bool {
ipStr := ip.String()
for _, addr := range addrs {
if addr.Address == ipStr {
return true
}
}
return false
}
// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
// owned by the project and available to be used, and use them if they are.
func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
// Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them.
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
if len(hosts) == 0 {
return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts")
}
if loadBalancerIP == nil {
glog.V(2).Info("Checking if the static IP address already exists: %s", name)
address, exists, err := gce.getAddress(name, region)
if err != nil {
return nil, fmt.Errorf("error looking for gce address: %v", err)
}
if !exists {
// Note, though static addresses that _aren't_ in use cost money, ones that _are_ in use don't.
// However, quota is limited to only 7 addresses per region by default.
op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do()
if err != nil {
return nil, fmt.Errorf("error creating gce static IP address: %v", err)
}
if err := gce.waitForRegionOp(op, region); err != nil {
return nil, fmt.Errorf("error waiting for gce static IP address to complete: %v", err)
}
address, exists, err = gce.getAddress(name, region)
if err != nil {
return nil, fmt.Errorf("error re-getting gce static IP address: %v", err)
}
if !exists {
return nil, fmt.Errorf("failed to re-get gce static IP address for %s", name)
}
}
if loadBalancerIP = net.ParseIP(address); loadBalancerIP == nil {
return nil, fmt.Errorf("error parsing gce static IP address: %s", address)
}
} else {
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
if err != nil {
return nil, fmt.Errorf("failed to list gce IP addresses: %v", err)
}
if !ownsAddress(loadBalancerIP, addresses.Items) {
return nil, fmt.Errorf("this gce project don't own the IP address: %s", loadBalancerIP.String())
}
}
glog.V(2).Info("Checking if load balancer already exists: %s", name)
_, exists, err := gce.GetTCPLoadBalancer(name, region)
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(name, region, requestedIP, ports)
if err != nil {
return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err)
return nil, err
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := gce.EnsureTCPLoadBalancerDeleted(name, region)
if err != nil {
return nil, fmt.Errorf("error deleting existing GCE load balancer: %v", err)
// If a specific IP address has been requested, we have to respect the user's
// request and use that IP. If the forwarding rule was already using a
// different IP, then we have to make sure to delete it once it's no longer
// attached to the forwarding rule to avoid leaking it.
ipAddress := fwdRuleIP
deleteFwdRuleIP := false
if requestedIP != nil && requestedIP.String() != fwdRuleIP {
ipAddress = requestedIP.String()
deleteFwdRuleIP = true
}
// Make sure we have an IP address to use if we weren't able to pull it from
// an existing forwarding rule. Note that we absolutely do not ever delete an
// IP address in this function -- we always reuse the old one if possible.
//
// We use static IP addresses to ensure that we can replace a load balancer's
// other components without changing the address a service is reachable on.
// Note that while static addresses that _aren't_ in use cost the user money,
// addresses that _are_ in use cost nothing.
// However, quota is limited to only 7 addresses per region by default.
if ipAddress == "" {
if requestedIP != nil {
if err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil {
return nil, err
}
ipAddress = requestedIP.String()
} else {
ipAddress, err = gce.createStaticIP(name, region)
if err != nil {
return nil, err
}
}
}
err = gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType))
// Deal with the firewall next. The reason we do this here rather than last
// is because the forwarding rule is used as the indicator that the load
// balancer is fully created - it's what getTCPLoadBalancer checks for.
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports)
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
}
glog.Infof("Creating forwarding rule pointing at target pool that already exists: %v", err)
return nil, err
}
if firewallNeedsUpdate {
// Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated.
if firewallExists {
if err := gce.updateFirewall(name, region, ipAddress, ports, hosts); err != nil {
return nil, err
}
} else {
if err := gce.createFirewall(name, region, ipAddress, ports, hosts); err != nil {
return nil, err
}
}
}
tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(name, region, affinityType)
if err != nil {
return nil, err
}
// Now we get to some slightly more interesting logic.
// First, neither target pools nor forwarding rules can be updated in place -
// they have to be deleted and recreated.
// Second, forwarding rules are layered on top of target pools in that you
// can't delete a target pool that's currently in use by a forwarding rule.
// Thus, we have to tear down the forwarding rule if either it or the target
// pool needs to be updated.
if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsUpdate) {
if err := gce.deleteForwardingRule(name, region); err != nil {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err)
}
if deleteFwdRuleIP {
// Delete the old IP to avoid leaking it, since we're going to be using
// the user-requested IP when recreating the forwarding rule below.
gce.deleteStaticIP(name, region)
}
}
if tpExists && tpNeedsUpdate {
if err := gce.deleteTargetPool(name, region); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err)
}
}
// Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new).
if tpNeedsUpdate {
if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err)
}
}
if tpNeedsUpdate || fwdRuleNeedsUpdate {
if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", name, err)
}
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: ipAddress}}
return status, nil
}
func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP net.IP, ports []*api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, "", nil
}
return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err)
}
if requestedIP != nil && requestedIP.String() != fwd.IPAddress {
return true, true, fwd.IPAddress, nil
}
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return false, false, "", err
}
if portRange != fwd.PortRange {
return true, true, fwd.IPAddress, nil
}
return true, false, fwd.IPAddress, nil
}
func loadBalancerPortRange(ports []*api.ServicePort) (string, error) {
if len(ports) == 0 {
return nil, fmt.Errorf("no ports specified for GCE load balancer")
return "", fmt.Errorf("no ports specified for GCE load balancer")
}
minPort := 65536
maxPort := 0
@ -452,42 +464,185 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n
maxPort = ports[i].Port
}
}
return fmt.Sprintf("%d-%d", minPort, maxPort), nil
}
// Doesn't check whether the hosts have changed, since host updating is handled
// separately.
func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType api.ServiceAffinity) (exists bool, needsUpdate bool, err error) {
tp, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
if translateAffinityType(affinityType) != tp.SessionAffinity {
return true, true, nil
}
return true, false, nil
}
// translate from what K8s supports to what the cloud provider supports for session affinity.
func translateAffinityType(affinityType api.ServiceAffinity) string {
switch affinityType {
case api.ServiceAffinityClientIP:
return gceAffinityTypeClientIP
case api.ServiceAffinityNone:
return gceAffinityTypeNone
default:
glog.Errorf("Unexpected affinity type: %v", affinityType)
return gceAffinityTypeNone
}
}
func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort) (exists bool, needsUpdate bool, err error) {
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
if fw.Description != makeFirewallDescription(ipAddress) {
return true, true, nil
}
// Make sure the allowed ports match
if len(fw.Allowed) != 1 {
return true, true, nil
}
if fw.Allowed[0].IPProtocol != "tcp" {
return true, true, nil
}
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
}
if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) {
return true, true, nil
}
return true, false, nil
}
func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name)
}
func makeFirewallDescription(ipAddress string) string {
return fmt.Sprintf("KubernetesAutoGenerated_OnlyAllowTrafficForDestinationIP_%s", ipAddress)
}
func slicesEqual(x, y []string) bool {
if len(x) != len(y) {
return false
}
sort.Strings(x)
sort.Strings(y)
for i := range x {
if x[i] != y[i] {
return false
}
}
return true
}
func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports []*api.ServicePort) error {
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return err
}
req := &compute.ForwardingRule{
Name: name,
IPAddress: loadBalancerIP.String(),
IPAddress: ipAddress,
IPProtocol: "TCP",
PortRange: fmt.Sprintf("%d-%d", minPort, maxPort),
PortRange: portRange,
Target: gce.targetPoolURL(name, region),
}
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
return err
}
if op != nil {
err = gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
return err
}
}
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
return nil, err
}
return nil
}
func (gce *GCECloud) createTargetPool(name, region string, hosts []string, affinityType api.ServiceAffinity) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, gce.zone, host))
}
pool := &compute.TargetPool{
Name: name,
Instances: instances,
SessionAffinity: translateAffinityType(affinityType),
}
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
}
return nil
}
func (gce *GCECloud) createFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) error {
firewall, err := gce.firewallObject(name, region, ipAddress, ports, hosts)
if err != nil {
return err
}
op, err := gce.service.Firewalls.Insert(gce.projectID, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForGlobalOp(op)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
}
return nil
}
func (gce *GCECloud) updateFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) error {
firewall, err := gce.firewallObject(name, region, ipAddress, ports, hosts)
if err != nil {
return err
}
op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForGlobalOp(op)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
}
return nil
}
func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) (*compute.Firewall, error) {
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
}
hostTags, err := gce.computeHostTags(hosts)
if err != nil {
return nil, err
}
firewall := &compute.Firewall{
Name: makeFirewallName(name),
Description: fmt.Sprintf("KubernetesAutoGenerated_OnlyAllowTrafficForDestinationIP_%s", fwd.IPAddress),
Description: makeFirewallDescription(ipAddress),
Network: gce.networkURL,
SourceRanges: []string{"0.0.0.0/0"},
TargetTags: hostTags,
@ -498,16 +653,7 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n
},
},
}
if op, err = gce.service.Firewalls.Insert(gce.projectID, firewall).Do(); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
}
if err = gce.waitForGlobalOp(op); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}}
return status, nil
return firewall, nil
}
// We grab all tags from all instances being added to the pool.
@ -550,6 +696,42 @@ func (gce *GCECloud) computeHostTags(hosts []string) ([]string, error) {
return tags.List(), nil
}
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) error {
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
if err != nil {
return fmt.Errorf("failed to list gce IP addresses: %v", err)
}
for _, addr := range addresses.Items {
if addr.Address == ipAddress {
// This project does own the address, so return success.
return nil
}
}
return fmt.Errorf("this gce project doesn't own the IP address: %s", ipAddress)
}
func (gce *GCECloud) createStaticIP(name, region string) (ipAddress string, err error) {
// If the address already exists, this will harmlessly continue on to getting
// the address in the next section.
op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return "", fmt.Errorf("error creating gce static IP address: %v", err)
}
if op != nil {
err := gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return "", fmt.Errorf("error waiting for gce static IP address to be created: %v", err)
}
}
// We have to get the address to know which IP was allocated for us.
address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
if err != nil {
return "", fmt.Errorf("error re-getting gce static IP address: %v", err)
}
return address.Address, nil
}
// UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer.
func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()

View File

@ -16,68 +16,7 @@ limitations under the License.
package gce_cloud
import (
"net"
"testing"
compute "google.golang.org/api/compute/v1"
)
func TestOwnsAddress(t *testing.T) {
tests := []struct {
ip net.IP
addrs []*compute.Address
expectOwn bool
}{
{
ip: net.ParseIP("1.2.3.4"),
addrs: []*compute.Address{},
expectOwn: false,
},
{
ip: net.ParseIP("1.2.3.4"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: false,
},
{
ip: net.ParseIP("2.3.4.5"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
{
ip: net.ParseIP("2.3.4.6"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
{
ip: net.ParseIP("2.3.4.7"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
}
for _, test := range tests {
own := ownsAddress(test.ip, test.addrs)
if own != test.expectOwn {
t.Errorf("expected: %v, got %v for %v", test.expectOwn, own, test)
}
}
}
import "testing"
func TestGetRegion(t *testing.T) {
gce := &GCECloud{