From 0e71ea12538a79c13def753f8c5c78bb05d77177 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 28 Aug 2015 15:40:43 -0700 Subject: [PATCH 1/4] Maintain an IP address independent of the forwarding rule for GCE --- pkg/cloudprovider/providers/gce/gce.go | 73 +++++++++++++++++++++ pkg/cloudprovider/providers/gce/gce_test.go | 59 +++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 9aa599dca6a..5f853286bbc 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -349,6 +349,27 @@ func makeFirewallName(name string) string { return fmt.Sprintf("k8s-fw-%s", name) } +func (gce *GCECloud) getAddress(name, region string) (string, bool, error) { + address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do() + if err == nil { + return address.Address, true, nil + } + if isHTTPErrorCode(err, http.StatusNotFound) { + return "", false, nil + } + return "", false, err +} + +func ownsAddress(ip net.IP, addrs []*compute.Address) bool { + ipStr := ip.String() + for _, addr := range addrs { + if addr.Address == ipStr { + return true + } + } + return false +} + // EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. // TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're // owned by the project and available to be used, and use them if they are. @@ -358,6 +379,44 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n } glog.V(2).Infof("Checking if load balancer already exists: %s", name) + if loadBalancerIP == nil { + glog.V(2).Info("Checking if the external ip address already exists: %s", name) + address, exists, err := gce.getAddress(name, region) + if err != nil { + return nil, fmt.Errorf("error looking for gce address: %v", err) + } + if !exists { + // Note, though static addresses that _aren't_ in use cost money, ones that _are_ in use don't. + // However, quota is limited. TODO: determine quota here + op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do() + if err != nil { + return nil, fmt.Errorf("error creating gce address: %v", err) + } + if err := gce.waitForRegionOp(op, region); err != nil { + return nil, fmt.Errorf("error waiting for gce address to complete: %v", err) + } + address, exists, err = gce.getAddress(name, region) + if err != nil { + return nil, fmt.Errorf("error re-getting gce address: %v", err) + } + if !exists { + return nil, fmt.Errorf("failed to re-get gce address for %s", name) + } + } + if loadBalancerIP = net.ParseIP(address); loadBalancerIP == nil { + return nil, fmt.Errorf("error parsing external IP: %s", address) + } + } else { + addresses, err := gce.service.Addresses.List(gce.projectID, region).Do() + if err != nil { + return nil, fmt.Errorf("failed to list gce addresses: %v", err) + } + if !ownsAddress(loadBalancerIP, addresses.Items) { + return nil, fmt.Errorf("don't own the address: %s", loadBalancerIP.String()) + } + } + + glog.V(2).Info("Checking if load balancer already exists: %s", name) _, exists, err := gce.GetTCPLoadBalancer(name, region) if err != nil { return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err) @@ -395,6 +454,7 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n } req := &compute.ForwardingRule{ Name: name, + IPAddress: loadBalancerIP.String(), IPProtocol: "TCP", PortRange: fmt.Sprintf("%d-%d", minPort, maxPort), Target: gce.targetPoolURL(name, region), @@ -569,6 +629,19 @@ func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error { return err } } + + op, err = gce.service.Addresses.Delete(gce.projectID, region, name).Do() + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Address %s is already deleted.", name) + } else if err != nil { + glog.Warningf("Failed to delete Address %s, got error %v", name, err) + return err + } + if err := gce.waitForRegionOp(op, region); err != nil { + glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err) + return err + } + op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { glog.Infof("Target pool %s already deleted.", name) diff --git a/pkg/cloudprovider/providers/gce/gce_test.go b/pkg/cloudprovider/providers/gce/gce_test.go index 34ff3b3382e..5bf6eb83517 100644 --- a/pkg/cloudprovider/providers/gce/gce_test.go +++ b/pkg/cloudprovider/providers/gce/gce_test.go @@ -17,9 +17,68 @@ limitations under the License. package gce_cloud import ( + "net" "testing" + + compute "google.golang.org/api/compute/v1" ) +func TestOwnsAddress(t *testing.T) { + tests := []struct { + ip net.IP + addrs []*compute.Address + expectOwn bool + }{ + { + ip: net.ParseIP("1.2.3.4"), + addrs: []*compute.Address{}, + expectOwn: false, + }, + { + ip: net.ParseIP("1.2.3.4"), + addrs: []*compute.Address{ + {Address: "2.3.4.5"}, + {Address: "2.3.4.6"}, + {Address: "2.3.4.7"}, + }, + expectOwn: false, + }, + { + ip: net.ParseIP("2.3.4.5"), + addrs: []*compute.Address{ + {Address: "2.3.4.5"}, + {Address: "2.3.4.6"}, + {Address: "2.3.4.7"}, + }, + expectOwn: true, + }, + { + ip: net.ParseIP("2.3.4.6"), + addrs: []*compute.Address{ + {Address: "2.3.4.5"}, + {Address: "2.3.4.6"}, + {Address: "2.3.4.7"}, + }, + expectOwn: true, + }, + { + ip: net.ParseIP("2.3.4.7"), + addrs: []*compute.Address{ + {Address: "2.3.4.5"}, + {Address: "2.3.4.6"}, + {Address: "2.3.4.7"}, + }, + expectOwn: true, + }, + } + for _, test := range tests { + own := ownsAddress(test.ip, test.addrs) + if own != test.expectOwn { + t.Errorf("expected: %v, got %v for %v", test.expectOwn, own, test) + } + } +} + func TestGetRegion(t *testing.T) { gce := &GCECloud{ zone: "us-central1-b", From cfdde6f7d4f63e891776c8c2895a9b9072e31686 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 25 Sep 2015 18:11:52 +0000 Subject: [PATCH 2/4] Fix PR for maintaining a GCE IP independently of the forwarding rule. This code was in rough shape, so I've fixed the issues with the original PR as well as a few other changes: 1. Clarify the error messages related to the "gce Addresses" to make it clear we're talking about static IP addresses 2. Fix the bug in the original PR, which was a nil pointer dereference from passing op to waitForRegionOp when the address doesn't exist. 3. Rearrange the steps of EnsureTCPLoadBalancerDeleted to be the reverse of EnsureCreated, which mostly just seems like good practice to me. This is also supported by the following two bugs I found :( 4. Fix an independent bug of returning too early if the target pool doesn't exist, effectively stranding the firewall. This was likely introduced because target pools used to be the last thing deleted, so it was previously safe to return there. 5. Fix an independent bug of not returning an error waiting for the target pool to be deleted failed. This was very possibly causing target pool leaks in our e2e tests. This was similarly due to assuming that the target pool was the last thing deleted in the function, then having the firewall deletion stuck in after it. --- pkg/cloudprovider/providers/gce/gce.go | 84 +++++++++++++------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 5f853286bbc..60883cff495 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -387,32 +387,32 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n } if !exists { // Note, though static addresses that _aren't_ in use cost money, ones that _are_ in use don't. - // However, quota is limited. TODO: determine quota here + // However, quota is limited to only 7 addresses per region by default. op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do() if err != nil { - return nil, fmt.Errorf("error creating gce address: %v", err) + return nil, fmt.Errorf("error creating gce static IP address: %v", err) } if err := gce.waitForRegionOp(op, region); err != nil { - return nil, fmt.Errorf("error waiting for gce address to complete: %v", err) + return nil, fmt.Errorf("error waiting for gce static IP address to complete: %v", err) } address, exists, err = gce.getAddress(name, region) if err != nil { - return nil, fmt.Errorf("error re-getting gce address: %v", err) + return nil, fmt.Errorf("error re-getting gce static IP address: %v", err) } if !exists { - return nil, fmt.Errorf("failed to re-get gce address for %s", name) + return nil, fmt.Errorf("failed to re-get gce static IP address for %s", name) } } if loadBalancerIP = net.ParseIP(address); loadBalancerIP == nil { - return nil, fmt.Errorf("error parsing external IP: %s", address) + return nil, fmt.Errorf("error parsing gce static IP address: %s", address) } } else { addresses, err := gce.service.Addresses.List(gce.projectID, region).Do() if err != nil { - return nil, fmt.Errorf("failed to list gce addresses: %v", err) + return nil, fmt.Errorf("failed to list gce IP addresses: %v", err) } if !ownsAddress(loadBalancerIP, addresses.Items) { - return nil, fmt.Errorf("don't own the address: %s", loadBalancerIP.String()) + return nil, fmt.Errorf("this gce project don't own the IP address: %s", loadBalancerIP.String()) } } @@ -459,9 +459,6 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n PortRange: fmt.Sprintf("%d-%d", minPort, maxPort), Target: gce.targetPoolURL(name, region), } - if loadBalancerIP != nil { - req.IPAddress = loadBalancerIP.String() - } op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { @@ -616,9 +613,23 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) // EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error { - op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() + fwName := makeFirewallName(name) + op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Forwarding rule %s already deleted. Continuing to delete target pool.", name) + glog.Infof("Firewall %s already deleted. Moving on to delete forwarding rule.", name) + } else if err != nil { + glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err) + return err + } else { + if err = gce.waitForGlobalOp(op); err != nil { + glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err) + return err + } + } + + op, err = gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Forwarding rule %s already deleted. Moving on to delete target pool.", name) } else if err != nil { glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error()) return err @@ -630,44 +641,33 @@ func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error { } } - op, err = gce.service.Addresses.Delete(gce.projectID, region, name).Do() - if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Address %s is already deleted.", name) - } else if err != nil { - glog.Warningf("Failed to delete Address %s, got error %v", name, err) - return err - } - if err := gce.waitForRegionOp(op, region); err != nil { - glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err) - return err - } - op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Target pool %s already deleted.", name) - return nil + glog.Infof("Target pool %s already deleted. Moving on to delete static IP address.", name) } else if err != nil { glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error()) return err - } - err = gce.waitForRegionOp(op, region) - if err != nil { - glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error()) - } - fwName := makeFirewallName(name) - op, err = gce.service.Firewalls.Delete(gce.projectID, fwName).Do() - if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Firewall doesn't exist, moving on to deleting target pool.") - } else if err != nil { - glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err) - return err } else { - if err = gce.waitForGlobalOp(op); err != nil { - glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err) + if err := gce.waitForRegionOp(op, region); err != nil { + glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error()) return err } } - return err + + op, err = gce.service.Addresses.Delete(gce.projectID, region, name).Do() + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Static IP address %s already deleted. Done deleting load balancer.", name) + } else if err != nil { + glog.Warningf("Failed to delete static IP Address %s, got error %v", name, err) + return err + } else { + if err := gce.waitForRegionOp(op, region); err != nil { + glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err) + return err + } + } + + return nil } // UrlMap management From 2feb54ce4097d6efbeb957ea0473b3394ee82002 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 2 Oct 2015 05:07:36 +0000 Subject: [PATCH 3/4] Add utility function for errors that runs multiple functions that errors as goroutines, blocks until they're all done executing, and combines the results into an Aggregate error. --- pkg/util/errors/errors.go | 17 +++++++++ pkg/util/errors/errors_test.go | 63 ++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/pkg/util/errors/errors.go b/pkg/util/errors/errors.go index c3012737012..a1a8e7aa24c 100644 --- a/pkg/util/errors/errors.go +++ b/pkg/util/errors/errors.go @@ -131,3 +131,20 @@ func Flatten(agg Aggregate) Aggregate { } return NewAggregate(result) } + +// AggregateGoroutines runs the provided functions in parallel, stuffing all +// non-nil errors into the returned Aggregate. +// Returns nil if all the functions complete successfully. +func AggregateGoroutines(funcs ...func() error) Aggregate { + errChan := make(chan error, len(funcs)) + for _, f := range funcs { + go func(f func() error) { errChan <- f() }(f) + } + errs := make([]error, 0) + for i := 0; i < cap(errChan); i++ { + if err := <-errChan; err != nil { + errs = append(errs, err) + } + } + return NewAggregate(errs) +} diff --git a/pkg/util/errors/errors_test.go b/pkg/util/errors/errors_test.go index 1d77fddde0d..7ecf919ffb0 100644 --- a/pkg/util/errors/errors_test.go +++ b/pkg/util/errors/errors_test.go @@ -221,3 +221,66 @@ func TestFlatten(t *testing.T) { } } } + +func TestAggregateGoroutines(t *testing.T) { + testCases := []struct { + errs []error + expected map[string]bool // can't compare directly to Aggregate due to non-deterministic ordering + }{ + { + []error{}, + nil, + }, + { + []error{nil}, + nil, + }, + { + []error{nil, nil}, + nil, + }, + { + []error{fmt.Errorf("1")}, + map[string]bool{"1": true}, + }, + { + []error{fmt.Errorf("1"), nil}, + map[string]bool{"1": true}, + }, + { + []error{fmt.Errorf("1"), fmt.Errorf("267")}, + map[string]bool{"1": true, "267": true}, + }, + { + []error{fmt.Errorf("1"), nil, fmt.Errorf("1234")}, + map[string]bool{"1": true, "1234": true}, + }, + { + []error{nil, fmt.Errorf("1"), nil, fmt.Errorf("1234"), fmt.Errorf("22")}, + map[string]bool{"1": true, "1234": true, "22": true}, + }, + } + for i, testCase := range testCases { + funcs := make([]func() error, len(testCase.errs)) + for i := range testCase.errs { + err := testCase.errs[i] + funcs[i] = func() error { return err } + } + agg := AggregateGoroutines(funcs...) + if agg == nil { + if len(testCase.expected) > 0 { + t.Errorf("%d: expected %v, got nil", i, testCase.expected) + } + continue + } + if len(agg.Errors()) != len(testCase.expected) { + t.Errorf("%d: expected %d errors in aggregate, got %v", i, len(testCase.expected), agg) + continue + } + for _, err := range agg.Errors() { + if !testCase.expected[err.Error()] { + t.Errorf("%d: expected %v, got aggregate containing %v", i, testCase.expected, err) + } + } + } +} From 90a9e01a689de36e5143575e8e8d7bf9741112c4 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 2 Oct 2015 05:09:35 +0000 Subject: [PATCH 4/4] Delete GCE external load balancer components in parallel. This will cut down on the amount of time it takes to delete an external load balancer, which should reduce the likelihood of resource leaks when clusters are deleted. --- pkg/cloudprovider/providers/gce/gce.go | 102 ++++++++++++++++--------- 1 file changed, 67 insertions(+), 35 deletions(-) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 60883cff495..c6dbb896b54 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -378,9 +379,8 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts") } - glog.V(2).Infof("Checking if load balancer already exists: %s", name) if loadBalancerIP == nil { - glog.V(2).Info("Checking if the external ip address already exists: %s", name) + glog.V(2).Info("Checking if the static IP address already exists: %s", name) address, exists, err := gce.getAddress(name, region) if err != nil { return nil, fmt.Errorf("error looking for gce address: %v", err) @@ -613,52 +613,85 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) // EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error { + err := errors.AggregateGoroutines( + func() error { return gce.deleteFirewall(name, region) }, + func() error { + if err := gce.deleteForwardingRule(name, region); err != nil { + return err + } + // The forwarding rule must be deleted before either the target pool or + // static IP address can, unfortunately. + err := errors.AggregateGoroutines( + func() error { return gce.deleteTargetPool(name, region) }, + func() error { return gce.deleteStaticIP(name, region) }, + ) + if err != nil { + return err + } + return nil + }, + ) + if err != nil { + return errors.Flatten(err) + } + return nil +} + +func (gce *GCECloud) deleteForwardingRule(name, region string) error { + op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Forwarding rule %s already deleted. Continuing to delete other resources.", name) + } else if err != nil { + glog.Warningf("Failed to delete forwarding rule %s: got error %s.", name, err.Error()) + return err + } else { + if err := gce.waitForRegionOp(op, region); err != nil { + glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.", name, err.Error()) + return err + } + } + return nil +} + +func (gce *GCECloud) deleteTargetPool(name, region string) error { + op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do() + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name) + } else if err != nil { + glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error()) + return err + } else { + if err := gce.waitForRegionOp(op, region); err != nil { + glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.", name, err.Error()) + return err + } + } + return nil +} + +func (gce *GCECloud) deleteFirewall(name, region string) error { fwName := makeFirewallName(name) op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Firewall %s already deleted. Moving on to delete forwarding rule.", name) + glog.Infof("Firewall %s already deleted. Continuing to delete other resources.", name) } else if err != nil { glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err) return err } else { - if err = gce.waitForGlobalOp(op); err != nil { + if err := gce.waitForGlobalOp(op); err != nil { glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err) return err } } + return nil +} - op, err = gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() +func (gce *GCECloud) deleteStaticIP(name, region string) error { + op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Forwarding rule %s already deleted. Moving on to delete target pool.", name) + glog.Infof("Static IP address %s already deleted. Continuing to delete other resources.", name) } else if err != nil { - glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error()) - return err - } else { - err = gce.waitForRegionOp(op, region) - if err != nil { - glog.Warningf("Failed waiting for Forwarding Rule %s to be deleted: got error %s.", name, err.Error()) - return err - } - } - - op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do() - if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Target pool %s already deleted. Moving on to delete static IP address.", name) - } else if err != nil { - glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error()) - return err - } else { - if err := gce.waitForRegionOp(op, region); err != nil { - glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error()) - return err - } - } - - op, err = gce.service.Addresses.Delete(gce.projectID, region, name).Do() - if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Static IP address %s already deleted. Done deleting load balancer.", name) - } else if err != nil { - glog.Warningf("Failed to delete static IP Address %s, got error %v", name, err) + glog.Warningf("Failed to delete static IP address %s, got error %v", name, err) return err } else { if err := gce.waitForRegionOp(op, region); err != nil { @@ -666,7 +699,6 @@ func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error { return err } } - return nil }