Modify external LB logic

This commit is contained in:
Nick Sardo 2017-06-01 15:23:11 -07:00
parent 2cdaf1f32b
commit 1283d65538

View File

@ -17,13 +17,10 @@ limitations under the License.
package gce
import (
"flag"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -37,95 +34,7 @@ import (
compute "google.golang.org/api/compute/v1"
)
type cidrs struct {
ipn netsets.IPNet
isSet bool
}
var (
lbSrcRngsFlag cidrs
)
func newLoadBalancerMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"loadbalancer_" + request, region, unusedMetricLabel},
}
}
func newTargetPoolMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"targetpool_" + request, region, unusedMetricLabel},
}
}
func newAddressMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"address_" + request, region, unusedMetricLabel},
}
}
func init() {
var err error
lbSrcRngsFlag.ipn, err = netsets.ParseIPNets([]string{"130.211.0.0/22", "35.191.0.0/16", "209.85.152.0/22", "209.85.204.0/22"}...)
if err != nil {
panic("Incorrect default GCE L7 source ranges")
}
flag.Var(&lbSrcRngsFlag, "cloud-provider-gce-lb-src-cidrs", "CIDRS opened in GCE firewall for LB traffic proxy & health checks")
}
// String is the method to format the flag's value, part of the flag.Value interface.
func (c *cidrs) String() string {
return strings.Join(c.ipn.StringSlice(), ",")
}
// Set supports a value of CSV or the flag repeated multiple times
func (c *cidrs) Set(value string) error {
// On first Set(), clear the original defaults
if !c.isSet {
c.isSet = true
c.ipn = make(netsets.IPNet)
} else {
return fmt.Errorf("GCE LB CIDRS have already been set")
}
for _, cidr := range strings.Split(value, ",") {
_, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return err
}
c.ipn.Insert(ipnet)
}
return nil
}
// LoadBalancerSrcRanges contains the ranges of ips used by the GCE load balancers (l4 & L7)
// for proxying client requests and performing health checks.
func LoadBalancerSrcRanges() []string {
return lbSrcRngsFlag.ipn.StringSlice()
}
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (gce *GCECloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err == nil {
status := &v1.LoadBalancerStatus{}
status.Ingress = []v1.LoadBalancerIngress{{IP: fwd.IPAddress}}
return status, true, nil
}
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil, false, nil
}
return nil, false, err
}
// EnsureLoadBalancer is an implementation of LoadBalancer.EnsureLoadBalancer.
// ensureExternalLoadBalancer is the external implementation of LoadBalancer.EnsureLoadBalancer.
// Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them.
@ -133,7 +42,7 @@ func (gce *GCECloud) GetLoadBalancer(clusterName string, service *v1.Service) (*
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
func (gce *GCECloud) ensureExternalLoadBalancer(clusterName string, apiService *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
if len(nodes) == 0 {
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
}
@ -200,10 +109,13 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
return
}
if isSafeToReleaseIP {
if err := gce.deleteStaticIP(loadBalancerName, gce.region); err != nil {
if err := gce.DeleteRegionAddress(loadBalancerName, gce.region); err != nil && !isNotFound(err) {
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
} else if isNotFound(err) {
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): address %s is not reserved.", loadBalancerName, serviceName, ipAddress)
} else {
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress)
}
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
}
@ -329,7 +241,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
// target pool to use local traffic health check.
glog.V(2).Infof("Updating from nodes health checks to local traffic health checks for service %v LB %v", apiService.Name, loadBalancerName)
if supportsNodesHealthCheck {
hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort())
hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
tpNeedsUpdate = true
}
@ -345,7 +257,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
tpNeedsUpdate = true
}
if supportsNodesHealthCheck {
hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort())
hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
}
// Now we get to some slightly more interesting logic.
@ -360,18 +272,18 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
// and something should fail before we recreate it, don't release the
// IP. That way we can come back to it later.
isSafeToReleaseIP = false
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
if err := gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
}
if tpExists && tpNeedsUpdate {
// Pass healthchecks to deleteTargetPool to cleanup health checks after cleaning up the target pool itself.
// Pass healthchecks to DeleteExternalTargetPoolAndChecks to cleanup health checks after cleaning up the target pool itself.
var hcNames []string
if hcToDelete != nil {
hcNames = append(hcNames, hcToDelete.Name)
}
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil {
if err := gce.DeleteExternalTargetPoolAndChecks(loadBalancerName, gce.region, hcNames...); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
@ -425,8 +337,8 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
return status, nil
}
// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
// updateExternalLoadBalancer is the external implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) updateExternalLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
hosts, err := gce.getInstancesByNames(nodeNames(nodes))
if err != nil {
return err
@ -445,11 +357,9 @@ func (gce *GCECloud) UpdateLoadBalancer(clusterName string, service *v1.Service,
return gce.updateTargetPool(loadBalancerName, existing, hosts)
}
// EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted.
func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
// ensureExternalLoadBalancerDeleted is the external implementation of LoadBalancer.EnsureLoadBalancerDeleted
func (gce *GCECloud) ensureExternalLoadBalancerDeleted(clusterName string, service *v1.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, loadBalancerName,
gce.region)
var hcNames []string
if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" {
@ -473,18 +383,18 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S
}
errs := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(makeFirewallName(loadBalancerName), gce.region) },
func() error { return ignoreNotFound(gce.DeleteFirewall(makeFirewallName(loadBalancerName))) },
// Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case.
func() error { return gce.deleteStaticIP(loadBalancerName, gce.region) },
func() error { return ignoreNotFound(gce.DeleteRegionAddress(loadBalancerName, gce.region)) },
func() error {
// The forwarding rule must be deleted before either the target pool can,
// unfortunately, so we have to do these two serially.
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
if err := ignoreNotFound(gce.DeleteRegionForwardingRule(loadBalancerName, gce.region)); err != nil {
return err
}
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil {
if err := gce.DeleteExternalTargetPoolAndChecks(loadBalancerName, gce.region, hcNames...); err != nil {
return err
}
return nil
@ -496,62 +406,17 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S
return nil
}
func (gce *GCECloud) DeleteForwardingRule(name string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
return err
}
return gce.deleteForwardingRule(name, region)
}
func (gce *GCECloud) deleteForwardingRule(name, region string) error {
mc := newForwardingRuleMetricContext("delete", region)
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Forwarding rule %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete forwarding rule %s: got error %s.", name, err.Error())
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.",
name, err.Error())
return err
}
}
return nil
}
// DeleteTargetPool deletes the given target pool.
func (gce *GCECloud) DeleteTargetPool(name string, hcNames ...string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
return err
}
return gce.deleteTargetPool(name, region, hcNames...)
}
func (gce *GCECloud) deleteTargetPool(name, region string, hcNames ...string) error {
mc := newTargetPoolMetricContext("delete", region)
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
func (gce *GCECloud) DeleteExternalTargetPoolAndChecks(name, region string, hcNames ...string) error {
if err := gce.DeleteTargetPool(name, region); err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error())
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.",
name, err.Error())
return err
}
return err
}
// Deletion of health checks is allowed only after the TargetPool reference is deleted
for _, hcName := range hcNames {
if err = func() error {
if err := func() error {
// Check whether it is nodes health check, which has different name from the load-balancer.
isNodesHealthCheck := hcName != name
if isNodesHealthCheck {
@ -616,8 +481,10 @@ func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region strin
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
}
if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, gce.region, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil {
return err
if !gce.OnXPN() {
if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, region, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil {
return err
}
}
var err error
if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil {
@ -639,16 +506,8 @@ func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region strin
HealthChecks: hcLinks,
}
mc := newTargetPoolMetricContext("insert", region)
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return mc.Observe(err)
}
if op != nil {
err = gce.waitForRegionOp(op, region, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if _, err := gce.CreateTargetPool(pool, region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
return nil
}
@ -668,26 +527,13 @@ func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.Str
}
if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
mc := newTargetPoolMetricContext("update", gce.region)
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if err != nil {
return mc.Observe(err)
}
if err := gce.waitForRegionOp(op, gce.region, mc); err != nil {
if err := gce.AddInstancesToTargetPool(loadBalancerName, gce.region, toAdd); err != nil {
return err
}
}
if len(toRemove) > 0 {
mc := newTargetPoolMetricContext("delete", gce.region)
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if err != nil {
return mc.Observe(err)
}
if err := gce.waitForRegionOp(op, gce.region, mc); err != nil {
if err := gce.RemoveInstancesFromTargetPool(loadBalancerName, gce.region, toRemove); err != nil {
return err
}
}
@ -695,7 +541,7 @@ func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.Str
// Try to verify that the correct number of nodes are now in the target pool.
// We've been bitten by a bug here before (#11327) where all nodes were
// accidentally removed and want to make similar problems easier to notice.
updatedPool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
updatedPool, err := gce.GetTargetPool(loadBalancerName, gce.region)
if err != nil {
return err
}
@ -851,10 +697,6 @@ func hostURLToComparablePath(hostURL string) string {
return hostURL[idx:]
}
func makeHealthCheckDescription(serviceName string) string {
return fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName)
}
func loadBalancerPortRange(ports []v1.ServicePort) (string, error) {
if len(ports) == 0 {
return "", fmt.Errorf("no ports specified for GCE load balancer")
@ -891,16 +733,12 @@ func translateAffinityType(affinityType v1.ServiceAffinity) string {
}
}
func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name)
}
func makeFirewallDescription(serviceName, ipAddress string) string {
return fmt.Sprintf(`{"kubernetes.io/service-name":"%s", "kubernetes.io/service-ip":"%s"}`,
serviceName, ipAddress)
}
func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []v1.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) {
if gce.OnXPN() {
glog.V(2).Infoln("firewallNeedsUpdate: Cluster is on XPN network - skipping firewall creation")
return false, false, nil
}
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
@ -995,54 +833,31 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
Target: gce.targetPoolURL(name, region),
}
mc := newForwardingRuleMetricContext("create", region)
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return mc.Observe(err)
}
if op != nil {
err = gce.waitForRegionOp(op, region, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if err = gce.CreateRegionForwardingRule(req, region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
return nil
}
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
mc := newFirewallMetricContext("create")
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return mc.Observe(err)
return err
}
op, err := gce.service.Firewalls.Insert(gce.projectID, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return mc.Observe(err)
}
if op != nil {
err = gce.waitForGlobalOp(op, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if err = gce.CreateFirewall(firewall); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
return nil
}
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
mc := newFirewallMetricContext("update")
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return mc.Observe(err)
return err
}
op, err := gce.service.Firewalls.Update(gce.projectID, name, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return mc.Observe(err)
}
if op != nil {
err = gce.waitForGlobalOp(op, mc)
if err != nil {
return err
}
if err = gce.UpdateFirewall(firewall); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
return nil
}
@ -1124,66 +939,14 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
addressObj.Address = existingIP
}
mc := newAddressMetricContext("create", region)
op, err := gce.service.Addresses.Insert(gce.projectID, region, addressObj).Do()
address, err := gce.ReserveRegionAddress(addressObj, region)
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", false, fmt.Errorf("error creating gce static IP address: %v",
mc.Observe(err))
return "", false, fmt.Errorf("error creating gce static IP address: %v", err)
}
// StatusConflict == the IP exists already.
existed = true
}
if op != nil {
err := gce.waitForRegionOp(op, region, mc)
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", false, fmt.Errorf("error waiting for gce static IP address to be created: %v", err)
}
// StatusConflict == the IP exists already.
existed = true
}
}
// We have to get the address to know which IP was allocated for us.
address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
if err != nil {
return "", false, fmt.Errorf("error re-getting gce static IP address: %v", err)
}
return address.Address, existed, nil
}
func (gce *GCECloud) deleteFirewall(name, region string) error {
mc := newFirewallMetricContext("delete")
op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.V(2).Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete firewall %s, got error %v", name, err)
return mc.Observe(err)
} else {
if err := gce.waitForGlobalOp(op, mc); err != nil {
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", name, err)
return err
}
}
return nil
}
func (gce *GCECloud) deleteStaticIP(name, region string) error {
mc := newAddressMetricContext("delete", region)
op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Static IP address %s is not reserved", name)
} else if err != nil {
glog.Warningf("Failed to delete static IP address %s, got error %v", name, err)
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err)
return err
}
}
return nil
}