diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 013a3f3425f..3ca5708ecf5 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -349,6 +350,27 @@ func makeFirewallName(name string) string { return fmt.Sprintf("k8s-fw-%s", name) } +func (gce *GCECloud) getAddress(name, region string) (string, bool, error) { + address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do() + if err == nil { + return address.Address, true, nil + } + if isHTTPErrorCode(err, http.StatusNotFound) { + return "", false, nil + } + return "", false, err +} + +func ownsAddress(ip net.IP, addrs []*compute.Address) bool { + ipStr := ip.String() + for _, addr := range addrs { + if addr.Address == ipStr { + return true + } + } + return false +} + // EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. // TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're // owned by the project and available to be used, and use them if they are. @@ -357,7 +379,44 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts") } - glog.V(2).Infof("Checking if load balancer already exists: %s", name) + if loadBalancerIP == nil { + glog.V(2).Info("Checking if the static IP address already exists: %s", name) + address, exists, err := gce.getAddress(name, region) + if err != nil { + return nil, fmt.Errorf("error looking for gce address: %v", err) + } + if !exists { + // Note, though static addresses that _aren't_ in use cost money, ones that _are_ in use don't. + // However, quota is limited to only 7 addresses per region by default. + op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do() + if err != nil { + return nil, fmt.Errorf("error creating gce static IP address: %v", err) + } + if err := gce.waitForRegionOp(op, region); err != nil { + return nil, fmt.Errorf("error waiting for gce static IP address to complete: %v", err) + } + address, exists, err = gce.getAddress(name, region) + if err != nil { + return nil, fmt.Errorf("error re-getting gce static IP address: %v", err) + } + if !exists { + return nil, fmt.Errorf("failed to re-get gce static IP address for %s", name) + } + } + if loadBalancerIP = net.ParseIP(address); loadBalancerIP == nil { + return nil, fmt.Errorf("error parsing gce static IP address: %s", address) + } + } else { + addresses, err := gce.service.Addresses.List(gce.projectID, region).Do() + if err != nil { + return nil, fmt.Errorf("failed to list gce IP addresses: %v", err) + } + if !ownsAddress(loadBalancerIP, addresses.Items) { + return nil, fmt.Errorf("this gce project don't own the IP address: %s", loadBalancerIP.String()) + } + } + + glog.V(2).Info("Checking if load balancer already exists: %s", name) _, exists, err := gce.GetTCPLoadBalancer(name, region) if err != nil { return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err) @@ -395,13 +454,11 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n } req := &compute.ForwardingRule{ Name: name, + IPAddress: loadBalancerIP.String(), IPProtocol: "TCP", PortRange: fmt.Sprintf("%d-%d", minPort, maxPort), Target: gce.targetPoolURL(name, region), } - if loadBalancerIP != nil { - req.IPAddress = loadBalancerIP.String() - } op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { @@ -556,45 +613,93 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) // EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error { + err := errors.AggregateGoroutines( + func() error { return gce.deleteFirewall(name, region) }, + func() error { + if err := gce.deleteForwardingRule(name, region); err != nil { + return err + } + // The forwarding rule must be deleted before either the target pool or + // static IP address can, unfortunately. + err := errors.AggregateGoroutines( + func() error { return gce.deleteTargetPool(name, region) }, + func() error { return gce.deleteStaticIP(name, region) }, + ) + if err != nil { + return err + } + return nil + }, + ) + if err != nil { + return errors.Flatten(err) + } + return nil +} + +func (gce *GCECloud) deleteForwardingRule(name, region string) error { op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Forwarding rule %s already deleted. Continuing to delete target pool.", name) + glog.Infof("Forwarding rule %s already deleted. Continuing to delete other resources.", name) } else if err != nil { - glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error()) + glog.Warningf("Failed to delete forwarding rule %s: got error %s.", name, err.Error()) return err } else { - err = gce.waitForRegionOp(op, region) - if err != nil { - glog.Warningf("Failed waiting for Forwarding Rule %s to be deleted: got error %s.", name, err.Error()) + if err := gce.waitForRegionOp(op, region); err != nil { + glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.", name, err.Error()) return err } } - op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do() + return nil +} + +func (gce *GCECloud) deleteTargetPool(name, region string) error { + op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Target pool %s already deleted.", name) - return nil + glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name) } else if err != nil { - glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error()) + glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error()) return err + } else { + if err := gce.waitForRegionOp(op, region); err != nil { + glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.", name, err.Error()) + return err + } } - err = gce.waitForRegionOp(op, region) - if err != nil { - glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error()) - } + return nil +} + +func (gce *GCECloud) deleteFirewall(name, region string) error { fwName := makeFirewallName(name) - op, err = gce.service.Firewalls.Delete(gce.projectID, fwName).Do() + op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Firewall doesn't exist, moving on to deleting target pool.") + glog.Infof("Firewall %s already deleted. Continuing to delete other resources.", name) } else if err != nil { glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err) return err } else { - if err = gce.waitForGlobalOp(op); err != nil { + if err := gce.waitForGlobalOp(op); err != nil { glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err) return err } } - return err + return nil +} + +func (gce *GCECloud) deleteStaticIP(name, region string) error { + op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do() + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Static IP address %s already deleted. Continuing to delete other resources.", name) + } else if err != nil { + glog.Warningf("Failed to delete static IP address %s, got error %v", name, err) + return err + } else { + if err := gce.waitForRegionOp(op, region); err != nil { + glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err) + return err + } + } + return nil } // UrlMap management diff --git a/pkg/cloudprovider/providers/gce/gce_test.go b/pkg/cloudprovider/providers/gce/gce_test.go index 34ff3b3382e..5bf6eb83517 100644 --- a/pkg/cloudprovider/providers/gce/gce_test.go +++ b/pkg/cloudprovider/providers/gce/gce_test.go @@ -17,9 +17,68 @@ limitations under the License. package gce_cloud import ( + "net" "testing" + + compute "google.golang.org/api/compute/v1" ) +func TestOwnsAddress(t *testing.T) { + tests := []struct { + ip net.IP + addrs []*compute.Address + expectOwn bool + }{ + { + ip: net.ParseIP("1.2.3.4"), + addrs: []*compute.Address{}, + expectOwn: false, + }, + { + ip: net.ParseIP("1.2.3.4"), + addrs: []*compute.Address{ + {Address: "2.3.4.5"}, + {Address: "2.3.4.6"}, + {Address: "2.3.4.7"}, + }, + expectOwn: false, + }, + { + ip: net.ParseIP("2.3.4.5"), + addrs: []*compute.Address{ + {Address: "2.3.4.5"}, + {Address: "2.3.4.6"}, + {Address: "2.3.4.7"}, + }, + expectOwn: true, + }, + { + ip: net.ParseIP("2.3.4.6"), + addrs: []*compute.Address{ + {Address: "2.3.4.5"}, + {Address: "2.3.4.6"}, + {Address: "2.3.4.7"}, + }, + expectOwn: true, + }, + { + ip: net.ParseIP("2.3.4.7"), + addrs: []*compute.Address{ + {Address: "2.3.4.5"}, + {Address: "2.3.4.6"}, + {Address: "2.3.4.7"}, + }, + expectOwn: true, + }, + } + for _, test := range tests { + own := ownsAddress(test.ip, test.addrs) + if own != test.expectOwn { + t.Errorf("expected: %v, got %v for %v", test.expectOwn, own, test) + } + } +} + func TestGetRegion(t *testing.T) { gce := &GCECloud{ zone: "us-central1-b", diff --git a/pkg/util/errors/errors.go b/pkg/util/errors/errors.go index c3012737012..a1a8e7aa24c 100644 --- a/pkg/util/errors/errors.go +++ b/pkg/util/errors/errors.go @@ -131,3 +131,20 @@ func Flatten(agg Aggregate) Aggregate { } return NewAggregate(result) } + +// AggregateGoroutines runs the provided functions in parallel, stuffing all +// non-nil errors into the returned Aggregate. +// Returns nil if all the functions complete successfully. +func AggregateGoroutines(funcs ...func() error) Aggregate { + errChan := make(chan error, len(funcs)) + for _, f := range funcs { + go func(f func() error) { errChan <- f() }(f) + } + errs := make([]error, 0) + for i := 0; i < cap(errChan); i++ { + if err := <-errChan; err != nil { + errs = append(errs, err) + } + } + return NewAggregate(errs) +} diff --git a/pkg/util/errors/errors_test.go b/pkg/util/errors/errors_test.go index 1d77fddde0d..7ecf919ffb0 100644 --- a/pkg/util/errors/errors_test.go +++ b/pkg/util/errors/errors_test.go @@ -221,3 +221,66 @@ func TestFlatten(t *testing.T) { } } } + +func TestAggregateGoroutines(t *testing.T) { + testCases := []struct { + errs []error + expected map[string]bool // can't compare directly to Aggregate due to non-deterministic ordering + }{ + { + []error{}, + nil, + }, + { + []error{nil}, + nil, + }, + { + []error{nil, nil}, + nil, + }, + { + []error{fmt.Errorf("1")}, + map[string]bool{"1": true}, + }, + { + []error{fmt.Errorf("1"), nil}, + map[string]bool{"1": true}, + }, + { + []error{fmt.Errorf("1"), fmt.Errorf("267")}, + map[string]bool{"1": true, "267": true}, + }, + { + []error{fmt.Errorf("1"), nil, fmt.Errorf("1234")}, + map[string]bool{"1": true, "1234": true}, + }, + { + []error{nil, fmt.Errorf("1"), nil, fmt.Errorf("1234"), fmt.Errorf("22")}, + map[string]bool{"1": true, "1234": true, "22": true}, + }, + } + for i, testCase := range testCases { + funcs := make([]func() error, len(testCase.errs)) + for i := range testCase.errs { + err := testCase.errs[i] + funcs[i] = func() error { return err } + } + agg := AggregateGoroutines(funcs...) + if agg == nil { + if len(testCase.expected) > 0 { + t.Errorf("%d: expected %v, got nil", i, testCase.expected) + } + continue + } + if len(agg.Errors()) != len(testCase.expected) { + t.Errorf("%d: expected %d errors in aggregate, got %v", i, len(testCase.expected), agg) + continue + } + for _, err := range agg.Errors() { + if !testCase.expected[err.Error()] { + t.Errorf("%d: expected %v, got aggregate containing %v", i, testCase.expected, err) + } + } + } +}