mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 01:06:27 +00:00
Merge pull request #14964 from a-robinson/lbdelete
Unrevert #14608 and decrease the latency of GCE load balancer deletions
This commit is contained in:
commit
d481ba7547
@ -28,6 +28,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
|
"k8s.io/kubernetes/pkg/util/errors"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
|
||||||
@ -349,6 +350,27 @@ func makeFirewallName(name string) string {
|
|||||||
return fmt.Sprintf("k8s-fw-%s", name)
|
return fmt.Sprintf("k8s-fw-%s", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) getAddress(name, region string) (string, bool, error) {
|
||||||
|
address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
|
||||||
|
if err == nil {
|
||||||
|
return address.Address, true, nil
|
||||||
|
}
|
||||||
|
if isHTTPErrorCode(err, http.StatusNotFound) {
|
||||||
|
return "", false, nil
|
||||||
|
}
|
||||||
|
return "", false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func ownsAddress(ip net.IP, addrs []*compute.Address) bool {
|
||||||
|
ipStr := ip.String()
|
||||||
|
for _, addr := range addrs {
|
||||||
|
if addr.Address == ipStr {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
|
// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
|
||||||
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
|
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
|
||||||
// owned by the project and available to be used, and use them if they are.
|
// owned by the project and available to be used, and use them if they are.
|
||||||
@ -357,7 +379,44 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n
|
|||||||
return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts")
|
return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts")
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("Checking if load balancer already exists: %s", name)
|
if loadBalancerIP == nil {
|
||||||
|
glog.V(2).Info("Checking if the static IP address already exists: %s", name)
|
||||||
|
address, exists, err := gce.getAddress(name, region)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error looking for gce address: %v", err)
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
// Note, though static addresses that _aren't_ in use cost money, ones that _are_ in use don't.
|
||||||
|
// However, quota is limited to only 7 addresses per region by default.
|
||||||
|
op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error creating gce static IP address: %v", err)
|
||||||
|
}
|
||||||
|
if err := gce.waitForRegionOp(op, region); err != nil {
|
||||||
|
return nil, fmt.Errorf("error waiting for gce static IP address to complete: %v", err)
|
||||||
|
}
|
||||||
|
address, exists, err = gce.getAddress(name, region)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error re-getting gce static IP address: %v", err)
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("failed to re-get gce static IP address for %s", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if loadBalancerIP = net.ParseIP(address); loadBalancerIP == nil {
|
||||||
|
return nil, fmt.Errorf("error parsing gce static IP address: %s", address)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to list gce IP addresses: %v", err)
|
||||||
|
}
|
||||||
|
if !ownsAddress(loadBalancerIP, addresses.Items) {
|
||||||
|
return nil, fmt.Errorf("this gce project don't own the IP address: %s", loadBalancerIP.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(2).Info("Checking if load balancer already exists: %s", name)
|
||||||
_, exists, err := gce.GetTCPLoadBalancer(name, region)
|
_, exists, err := gce.GetTCPLoadBalancer(name, region)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err)
|
return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err)
|
||||||
@ -395,13 +454,11 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n
|
|||||||
}
|
}
|
||||||
req := &compute.ForwardingRule{
|
req := &compute.ForwardingRule{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
IPAddress: loadBalancerIP.String(),
|
||||||
IPProtocol: "TCP",
|
IPProtocol: "TCP",
|
||||||
PortRange: fmt.Sprintf("%d-%d", minPort, maxPort),
|
PortRange: fmt.Sprintf("%d-%d", minPort, maxPort),
|
||||||
Target: gce.targetPoolURL(name, region),
|
Target: gce.targetPoolURL(name, region),
|
||||||
}
|
}
|
||||||
if loadBalancerIP != nil {
|
|
||||||
req.IPAddress = loadBalancerIP.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
|
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
|
||||||
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
|
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
|
||||||
@ -556,45 +613,93 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string)
|
|||||||
|
|
||||||
// EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted.
|
// EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted.
|
||||||
func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
|
func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
|
||||||
|
err := errors.AggregateGoroutines(
|
||||||
|
func() error { return gce.deleteFirewall(name, region) },
|
||||||
|
func() error {
|
||||||
|
if err := gce.deleteForwardingRule(name, region); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// The forwarding rule must be deleted before either the target pool or
|
||||||
|
// static IP address can, unfortunately.
|
||||||
|
err := errors.AggregateGoroutines(
|
||||||
|
func() error { return gce.deleteTargetPool(name, region) },
|
||||||
|
func() error { return gce.deleteStaticIP(name, region) },
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Flatten(err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) deleteForwardingRule(name, region string) error {
|
||||||
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
|
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
|
||||||
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
||||||
glog.Infof("Forwarding rule %s already deleted. Continuing to delete target pool.", name)
|
glog.Infof("Forwarding rule %s already deleted. Continuing to delete other resources.", name)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error())
|
glog.Warningf("Failed to delete forwarding rule %s: got error %s.", name, err.Error())
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
err = gce.waitForRegionOp(op, region)
|
if err := gce.waitForRegionOp(op, region); err != nil {
|
||||||
if err != nil {
|
glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.", name, err.Error())
|
||||||
glog.Warningf("Failed waiting for Forwarding Rule %s to be deleted: got error %s.", name, err.Error())
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) deleteTargetPool(name, region string) error {
|
||||||
|
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
|
||||||
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
||||||
glog.Infof("Target pool %s already deleted.", name)
|
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)
|
||||||
return nil
|
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error())
|
glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error())
|
||||||
return err
|
return err
|
||||||
|
} else {
|
||||||
|
if err := gce.waitForRegionOp(op, region); err != nil {
|
||||||
|
glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.", name, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err = gce.waitForRegionOp(op, region)
|
return nil
|
||||||
if err != nil {
|
}
|
||||||
glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error())
|
|
||||||
}
|
func (gce *GCECloud) deleteFirewall(name, region string) error {
|
||||||
fwName := makeFirewallName(name)
|
fwName := makeFirewallName(name)
|
||||||
op, err = gce.service.Firewalls.Delete(gce.projectID, fwName).Do()
|
op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do()
|
||||||
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
||||||
glog.Infof("Firewall doesn't exist, moving on to deleting target pool.")
|
glog.Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err)
|
glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err)
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
if err = gce.waitForGlobalOp(op); err != nil {
|
if err := gce.waitForGlobalOp(op); err != nil {
|
||||||
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err)
|
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) deleteStaticIP(name, region string) error {
|
||||||
|
op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do()
|
||||||
|
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
||||||
|
glog.Infof("Static IP address %s already deleted. Continuing to delete other resources.", name)
|
||||||
|
} else if err != nil {
|
||||||
|
glog.Warningf("Failed to delete static IP address %s, got error %v", name, err)
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
if err := gce.waitForRegionOp(op, region); err != nil {
|
||||||
|
glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UrlMap management
|
// UrlMap management
|
||||||
|
@ -17,9 +17,68 @@ limitations under the License.
|
|||||||
package gce_cloud
|
package gce_cloud
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
compute "google.golang.org/api/compute/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestOwnsAddress(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
ip net.IP
|
||||||
|
addrs []*compute.Address
|
||||||
|
expectOwn bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
ip: net.ParseIP("1.2.3.4"),
|
||||||
|
addrs: []*compute.Address{},
|
||||||
|
expectOwn: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ip: net.ParseIP("1.2.3.4"),
|
||||||
|
addrs: []*compute.Address{
|
||||||
|
{Address: "2.3.4.5"},
|
||||||
|
{Address: "2.3.4.6"},
|
||||||
|
{Address: "2.3.4.7"},
|
||||||
|
},
|
||||||
|
expectOwn: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ip: net.ParseIP("2.3.4.5"),
|
||||||
|
addrs: []*compute.Address{
|
||||||
|
{Address: "2.3.4.5"},
|
||||||
|
{Address: "2.3.4.6"},
|
||||||
|
{Address: "2.3.4.7"},
|
||||||
|
},
|
||||||
|
expectOwn: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ip: net.ParseIP("2.3.4.6"),
|
||||||
|
addrs: []*compute.Address{
|
||||||
|
{Address: "2.3.4.5"},
|
||||||
|
{Address: "2.3.4.6"},
|
||||||
|
{Address: "2.3.4.7"},
|
||||||
|
},
|
||||||
|
expectOwn: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ip: net.ParseIP("2.3.4.7"),
|
||||||
|
addrs: []*compute.Address{
|
||||||
|
{Address: "2.3.4.5"},
|
||||||
|
{Address: "2.3.4.6"},
|
||||||
|
{Address: "2.3.4.7"},
|
||||||
|
},
|
||||||
|
expectOwn: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
own := ownsAddress(test.ip, test.addrs)
|
||||||
|
if own != test.expectOwn {
|
||||||
|
t.Errorf("expected: %v, got %v for %v", test.expectOwn, own, test)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetRegion(t *testing.T) {
|
func TestGetRegion(t *testing.T) {
|
||||||
gce := &GCECloud{
|
gce := &GCECloud{
|
||||||
zone: "us-central1-b",
|
zone: "us-central1-b",
|
||||||
|
@ -131,3 +131,20 @@ func Flatten(agg Aggregate) Aggregate {
|
|||||||
}
|
}
|
||||||
return NewAggregate(result)
|
return NewAggregate(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AggregateGoroutines runs the provided functions in parallel, stuffing all
|
||||||
|
// non-nil errors into the returned Aggregate.
|
||||||
|
// Returns nil if all the functions complete successfully.
|
||||||
|
func AggregateGoroutines(funcs ...func() error) Aggregate {
|
||||||
|
errChan := make(chan error, len(funcs))
|
||||||
|
for _, f := range funcs {
|
||||||
|
go func(f func() error) { errChan <- f() }(f)
|
||||||
|
}
|
||||||
|
errs := make([]error, 0)
|
||||||
|
for i := 0; i < cap(errChan); i++ {
|
||||||
|
if err := <-errChan; err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NewAggregate(errs)
|
||||||
|
}
|
||||||
|
@ -221,3 +221,66 @@ func TestFlatten(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAggregateGoroutines(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
errs []error
|
||||||
|
expected map[string]bool // can't compare directly to Aggregate due to non-deterministic ordering
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
[]error{},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]error{nil},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]error{nil, nil},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]error{fmt.Errorf("1")},
|
||||||
|
map[string]bool{"1": true},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]error{fmt.Errorf("1"), nil},
|
||||||
|
map[string]bool{"1": true},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]error{fmt.Errorf("1"), fmt.Errorf("267")},
|
||||||
|
map[string]bool{"1": true, "267": true},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]error{fmt.Errorf("1"), nil, fmt.Errorf("1234")},
|
||||||
|
map[string]bool{"1": true, "1234": true},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]error{nil, fmt.Errorf("1"), nil, fmt.Errorf("1234"), fmt.Errorf("22")},
|
||||||
|
map[string]bool{"1": true, "1234": true, "22": true},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, testCase := range testCases {
|
||||||
|
funcs := make([]func() error, len(testCase.errs))
|
||||||
|
for i := range testCase.errs {
|
||||||
|
err := testCase.errs[i]
|
||||||
|
funcs[i] = func() error { return err }
|
||||||
|
}
|
||||||
|
agg := AggregateGoroutines(funcs...)
|
||||||
|
if agg == nil {
|
||||||
|
if len(testCase.expected) > 0 {
|
||||||
|
t.Errorf("%d: expected %v, got nil", i, testCase.expected)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(agg.Errors()) != len(testCase.expected) {
|
||||||
|
t.Errorf("%d: expected %d errors in aggregate, got %v", i, len(testCase.expected), agg)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, err := range agg.Errors() {
|
||||||
|
if !testCase.expected[err.Error()] {
|
||||||
|
t.Errorf("%d: expected %v, got aggregate containing %v", i, testCase.expected, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user