Merge pull request #15286 from a-robinson/ensure

Correctly support updates in EnsureTCPLoadBalancer for GCE.
This commit is contained in:
Alex Robinson 2015-10-09 17:34:40 -07:00
commit 3bbfe48e0a
2 changed files with 350 additions and 219 deletions

View File

@ -22,6 +22,7 @@ import (
"net"
"net/http"
"path"
"sort"
"strconv"
"strings"
"time"
@ -44,9 +45,16 @@ import (
const (
ProviderName = "gce"
)
const k8sNodeRouteTag = "k8s-node-route"
k8sNodeRouteTag = "k8s-node-route"
// AffinityTypeNone - no session affinity.
gceAffinityTypeNone = "None"
// AffinityTypeClientIP - affinity based on Client IP.
gceAffinityTypeClientIP = "CLIENT_IP"
// AffinityTypeClientIPProto - affinity based on Client IP and port.
gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO"
)
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
@ -232,38 +240,6 @@ func hostURLToComparablePath(hostURL string) string {
return hostURL[idx:]
}
// Session Affinity Type string
type GCEAffinityType string
const (
// AffinityTypeNone - no session affinity.
GCEAffinityTypeNone GCEAffinityType = "None"
// AffinityTypeClientIP is the Client IP based.
GCEAffinityTypeClientIP GCEAffinityType = "CLIENT_IP"
// AffinityTypeClientIP is the Client IP based.
GCEAffinityTypeClientIPProto GCEAffinityType = "CLIENT_IP_PROTO"
)
func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, gce.zone, host))
}
pool := &compute.TargetPool{
Name: name,
Instances: instances,
SessionAffinity: string(affinityType),
}
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil {
return err
}
if err = gce.waitForRegionOp(op, region); err != nil {
return err
}
return nil
}
func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
}
@ -333,114 +309,157 @@ func isHTTPErrorCode(err error, code int) bool {
return ok && apiErr.Code == code
}
// translate from what K8s supports to what the cloud provider supports for session affinity.
func translateAffinityType(affinityType api.ServiceAffinity) GCEAffinityType {
switch affinityType {
case api.ServiceAffinityClientIP:
return GCEAffinityTypeClientIP
case api.ServiceAffinityNone:
return GCEAffinityTypeNone
default:
glog.Errorf("Unexpected affinity type: %v", affinityType)
return GCEAffinityTypeNone
}
}
func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name)
}
func (gce *GCECloud) getAddress(name, region string) (string, bool, error) {
address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
if err == nil {
return address.Address, true, nil
}
if isHTTPErrorCode(err, http.StatusNotFound) {
return "", false, nil
}
return "", false, err
}
func ownsAddress(ip net.IP, addrs []*compute.Address) bool {
ipStr := ip.String()
for _, addr := range addrs {
if addr.Address == ipStr {
return true
}
}
return false
}
// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
// owned by the project and available to be used, and use them if they are.
func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
// Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them.
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
if len(hosts) == 0 {
return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts")
}
if loadBalancerIP == nil {
glog.V(2).Info("Checking if the static IP address already exists: %s", name)
address, exists, err := gce.getAddress(name, region)
if err != nil {
return nil, fmt.Errorf("error looking for gce address: %v", err)
}
if !exists {
// Note, though static addresses that _aren't_ in use cost money, ones that _are_ in use don't.
// However, quota is limited to only 7 addresses per region by default.
op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do()
if err != nil {
return nil, fmt.Errorf("error creating gce static IP address: %v", err)
}
if err := gce.waitForRegionOp(op, region); err != nil {
return nil, fmt.Errorf("error waiting for gce static IP address to complete: %v", err)
}
address, exists, err = gce.getAddress(name, region)
if err != nil {
return nil, fmt.Errorf("error re-getting gce static IP address: %v", err)
}
if !exists {
return nil, fmt.Errorf("failed to re-get gce static IP address for %s", name)
}
}
if loadBalancerIP = net.ParseIP(address); loadBalancerIP == nil {
return nil, fmt.Errorf("error parsing gce static IP address: %s", address)
}
} else {
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
if err != nil {
return nil, fmt.Errorf("failed to list gce IP addresses: %v", err)
}
if !ownsAddress(loadBalancerIP, addresses.Items) {
return nil, fmt.Errorf("this gce project don't own the IP address: %s", loadBalancerIP.String())
}
}
glog.V(2).Info("Checking if load balancer already exists: %s", name)
_, exists, err := gce.GetTCPLoadBalancer(name, region)
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(name, region, requestedIP, ports)
if err != nil {
return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err)
return nil, err
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := gce.EnsureTCPLoadBalancerDeleted(name, region)
if err != nil {
return nil, fmt.Errorf("error deleting existing GCE load balancer: %v", err)
}
}
err = gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType))
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
// Make sure we know which IP address will be used and have properly reserved
// it as static before moving forward with the rest of our operations.
//
// We use static IP addresses when updating a load balancer to ensure that we
// can replace the load balancer's other components without changing the
// address its service is reachable on. We do it this way rather than always
// keeping the static IP around even though this is more complicated because
// it makes it less likely that we'll run into quota issues. Only 7 static
// IP addresses are allowed per region by default.
//
// We could let an IP be allocated for us when the forwarding rule is created,
// but we need the IP to set up the firewall rule, and we want to keep the
// forwarding rule creation as the last thing that needs to be done in this
// function in order to maintain the invariant that "if the forwarding rule
// exists, the LB has been fully created".
ipAddress := ""
if requestedIP != nil {
// If a specific IP address has been requested, we have to respect the
// user's request and use that IP. If the forwarding rule was already using
// a different IP, it will be harmlessly abandoned because it was only an
// ephemeral IP (or it was a different static IP owned by the user, in which
// case we shouldn't delete it anyway).
if err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil {
return nil, err
}
ipAddress = requestedIP.String()
} else {
// This will either allocate a new static IP if the forwarding rule didn't
// already have an IP, or it will promote the forwarding rule's IP from
// ephemeral to static.
ipAddress, err = gce.createOrPromoteStaticIP(name, region, fwdRuleIP)
if err != nil {
return nil, err
}
glog.Infof("Creating forwarding rule pointing at target pool that already exists: %v", err)
}
// Deal with the firewall next. The reason we do this here rather than last
// is because the forwarding rule is used as the indicator that the load
// balancer is fully created - it's what getTCPLoadBalancer checks for.
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports)
if err != nil {
return nil, err
}
if firewallNeedsUpdate {
// Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated.
if firewallExists {
if err := gce.updateFirewall(name, region, ipAddress, ports, hosts); err != nil {
return nil, err
}
} else {
if err := gce.createFirewall(name, region, ipAddress, ports, hosts); err != nil {
return nil, err
}
}
}
tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(name, region, affinityType)
if err != nil {
return nil, err
}
// Now we get to some slightly more interesting logic.
// First, neither target pools nor forwarding rules can be updated in place -
// they have to be deleted and recreated.
// Second, forwarding rules are layered on top of target pools in that you
// can't delete a target pool that's currently in use by a forwarding rule.
// Thus, we have to tear down the forwarding rule if either it or the target
// pool needs to be updated.
if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsUpdate) {
if err := gce.deleteForwardingRule(name, region); err != nil {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err)
}
}
if tpExists && tpNeedsUpdate {
if err := gce.deleteTargetPool(name, region); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err)
}
}
// Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new).
if tpNeedsUpdate {
if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err)
}
}
if tpNeedsUpdate || fwdRuleNeedsUpdate {
if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", name, err)
}
}
// Now that we're done operating on everything, demote the static IP back to
// ephemeral to avoid taking up the user's static IP quota.
if err := gce.deleteStaticIP(name, region); err != nil {
return nil, fmt.Errorf("failed to release static IP %s after finishing update of load balancer resources: %v", err)
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: ipAddress}}
return status, nil
}
// Passing nil for requested IP is perfectly fine - it just means that no specific
// IP is being requested.
// Returns whether the forwarding rule exists, whether it needs to be updated,
// what its IP address is (if it exists), and any error we encountered.
func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP net.IP, ports []*api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, "", nil
}
return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err)
}
if requestedIP != nil && requestedIP.String() != fwd.IPAddress {
return true, true, fwd.IPAddress, nil
}
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return false, false, "", err
}
if portRange != fwd.PortRange {
return true, true, fwd.IPAddress, nil
}
return true, false, fwd.IPAddress, nil
}
func loadBalancerPortRange(ports []*api.ServicePort) (string, error) {
if len(ports) == 0 {
return nil, fmt.Errorf("no ports specified for GCE load balancer")
return "", fmt.Errorf("no ports specified for GCE load balancer")
}
minPort := 65536
maxPort := 0
@ -452,42 +471,182 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n
maxPort = ports[i].Port
}
}
return fmt.Sprintf("%d-%d", minPort, maxPort), nil
}
// Doesn't check whether the hosts have changed, since host updating is handled
// separately.
func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType api.ServiceAffinity) (exists bool, needsUpdate bool, err error) {
tp, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
if translateAffinityType(affinityType) != tp.SessionAffinity {
return true, true, nil
}
return true, false, nil
}
// translate from what K8s supports to what the cloud provider supports for session affinity.
func translateAffinityType(affinityType api.ServiceAffinity) string {
switch affinityType {
case api.ServiceAffinityClientIP:
return gceAffinityTypeClientIP
case api.ServiceAffinityNone:
return gceAffinityTypeNone
default:
glog.Errorf("Unexpected affinity type: %v", affinityType)
return gceAffinityTypeNone
}
}
func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort) (exists bool, needsUpdate bool, err error) {
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
if fw.Description != makeFirewallDescription(ipAddress) {
return true, true, nil
}
if len(fw.Allowed) != 1 || fw.Allowed[0].IPProtocol != "tcp" {
return true, true, nil
}
// Make sure the allowed ports match.
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
}
if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) {
return true, true, nil
}
return true, false, nil
}
func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name)
}
func makeFirewallDescription(ipAddress string) string {
return fmt.Sprintf("KubernetesAutoGenerated_OnlyAllowTrafficForDestinationIP_%s", ipAddress)
}
func slicesEqual(x, y []string) bool {
if len(x) != len(y) {
return false
}
sort.Strings(x)
sort.Strings(y)
for i := range x {
if x[i] != y[i] {
return false
}
}
return true
}
func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports []*api.ServicePort) error {
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return err
}
req := &compute.ForwardingRule{
Name: name,
IPAddress: loadBalancerIP.String(),
IPAddress: ipAddress,
IPProtocol: "TCP",
PortRange: fmt.Sprintf("%d-%d", minPort, maxPort),
PortRange: portRange,
Target: gce.targetPoolURL(name, region),
}
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
return err
}
if op != nil {
err = gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
return err
}
}
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
return nil, err
}
return nil
}
func (gce *GCECloud) createTargetPool(name, region string, hosts []string, affinityType api.ServiceAffinity) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, gce.zone, host))
}
pool := &compute.TargetPool{
Name: name,
Instances: instances,
SessionAffinity: translateAffinityType(affinityType),
}
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
}
return nil
}
func (gce *GCECloud) createFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) error {
firewall, err := gce.firewallObject(name, region, ipAddress, ports, hosts)
if err != nil {
return err
}
op, err := gce.service.Firewalls.Insert(gce.projectID, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForGlobalOp(op)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
}
return nil
}
func (gce *GCECloud) updateFirewall(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) error {
firewall, err := gce.firewallObject(name, region, ipAddress, ports, hosts)
if err != nil {
return err
}
op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForGlobalOp(op)
if err != nil {
return err
}
}
return nil
}
func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api.ServicePort, hosts []string) (*compute.Firewall, error) {
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
}
hostTags, err := gce.computeHostTags(hosts)
if err != nil {
return nil, err
}
firewall := &compute.Firewall{
Name: makeFirewallName(name),
Description: fmt.Sprintf("KubernetesAutoGenerated_OnlyAllowTrafficForDestinationIP_%s", fwd.IPAddress),
Description: makeFirewallDescription(ipAddress),
Network: gce.networkURL,
SourceRanges: []string{"0.0.0.0/0"},
TargetTags: hostTags,
@ -498,16 +657,7 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n
},
},
}
if op, err = gce.service.Firewalls.Insert(gce.projectID, firewall).Do(); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
}
if err = gce.waitForGlobalOp(op); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}}
return status, nil
return firewall, nil
}
// We grab all tags from all instances being added to the pool.
@ -550,6 +700,48 @@ func (gce *GCECloud) computeHostTags(hosts []string) ([]string, error) {
return tags.List(), nil
}
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) error {
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
if err != nil {
return fmt.Errorf("failed to list gce IP addresses: %v", err)
}
for _, addr := range addresses.Items {
if addr.Address == ipAddress {
// This project does own the address, so return success.
return nil
}
}
return fmt.Errorf("this gce project doesn't own the IP address: %s", ipAddress)
}
func (gce *GCECloud) createOrPromoteStaticIP(name, region, existingIP string) (ipAddress string, err error) {
// If the address doesn't exist, this will create it.
// If the existingIP exists but is ephemeral, this will promote it to static.
// If the address already exists, this will harmlessly return a StatusConflict
// and we'll grab the IP before returning.
addressObj := &compute.Address{Name: name}
if existingIP != "" {
addressObj.Address = existingIP
}
op, err := gce.service.Addresses.Insert(gce.projectID, region, addressObj).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return "", fmt.Errorf("error creating gce static IP address: %v", err)
}
if op != nil {
err := gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return "", fmt.Errorf("error waiting for gce static IP address to be created: %v", err)
}
}
// We have to get the address to know which IP was allocated for us.
address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
if err != nil {
return "", fmt.Errorf("error re-getting gce static IP address: %v", err)
}
return address.Address, nil
}
// UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer.
func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
@ -615,17 +807,17 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string)
func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
err := errors.AggregateGoroutines(
func() error { return gce.deleteFirewall(name, region) },
// Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureTCPLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case.
func() error { return gce.deleteStaticIP(name, region) },
func() error {
// The forwarding rule must be deleted before either the target pool can,
// unfortunately, so we have to do these two serially.
if err := gce.deleteForwardingRule(name, region); err != nil {
return err
}
// The forwarding rule must be deleted before either the target pool or
// static IP address can, unfortunately.
err := errors.AggregateGoroutines(
func() error { return gce.deleteTargetPool(name, region) },
func() error { return gce.deleteStaticIP(name, region) },
)
if err != nil {
if err := gce.deleteTargetPool(name, region); err != nil {
return err
}
return nil

View File

@ -16,68 +16,7 @@ limitations under the License.
package gce_cloud
import (
"net"
"testing"
compute "google.golang.org/api/compute/v1"
)
func TestOwnsAddress(t *testing.T) {
tests := []struct {
ip net.IP
addrs []*compute.Address
expectOwn bool
}{
{
ip: net.ParseIP("1.2.3.4"),
addrs: []*compute.Address{},
expectOwn: false,
},
{
ip: net.ParseIP("1.2.3.4"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: false,
},
{
ip: net.ParseIP("2.3.4.5"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
{
ip: net.ParseIP("2.3.4.6"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
{
ip: net.ParseIP("2.3.4.7"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
}
for _, test := range tests {
own := ownsAddress(test.ip, test.addrs)
if own != test.expectOwn {
t.Errorf("expected: %v, got %v for %v", test.expectOwn, own, test)
}
}
}
import "testing"
func TestGetRegion(t *testing.T) {
gce := &GCECloud{