mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Demote static IPs ASAP for easier cleanup
This exposed bugs in the IP promotion/demotion logic.
This commit is contained in:
parent
78322091f0
commit
fecb71420c
@ -432,7 +432,11 @@ func isHTTPErrorCode(err error, code int) bool {
|
||||
// new load balancers and updating existing load balancers, recognizing when
|
||||
// each is needed.
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, ports, hostNames)
|
||||
portStr := []string{}
|
||||
for _, p := range ports {
|
||||
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
|
||||
}
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames)
|
||||
|
||||
if len(hostNames) == 0 {
|
||||
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
|
||||
@ -465,53 +469,86 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
// function in order to maintain the invariant that "if the forwarding rule
|
||||
// exists, the LB has been fully created".
|
||||
ipAddress := ""
|
||||
|
||||
// Through this process we try to keep track of whether it is safe to
|
||||
// release the IP that was allocated. If the user specifically asked for
|
||||
// an IP, we assume they are managing it themselves. Otherwise, we will
|
||||
// release the IP in case of early-terminating failure or upon successful
|
||||
// creating of the LB.
|
||||
releaseStaticIP := false
|
||||
isUserOwnedIP := false // if this is set, we never release the IP
|
||||
isSafeToReleaseIP := false
|
||||
defer func() {
|
||||
if isUserOwnedIP {
|
||||
return
|
||||
}
|
||||
if isSafeToReleaseIP {
|
||||
if err := gce.deleteStaticIP(name, region); err != nil {
|
||||
glog.Errorf("failed to release static IP %s for load balancer (%v, %v): %v", ipAddress, name, region, err)
|
||||
}
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v): released static IP %s", name, ipAddress)
|
||||
} else {
|
||||
glog.Warningf("orphaning static IP %s during update of load balancer (%v, %v): %v", ipAddress, name, region, err)
|
||||
}
|
||||
}()
|
||||
|
||||
if requestedIP != nil {
|
||||
// If a specific IP address has been requested, we have to respect the
|
||||
// user's request and use that IP. If the forwarding rule was already using
|
||||
// a different IP, it will be harmlessly abandoned because it was only an
|
||||
// ephemeral IP (or it was a different static IP owned by the user, in which
|
||||
// case we shouldn't delete it anyway).
|
||||
if err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil {
|
||||
return nil, err
|
||||
if isStatic, err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil {
|
||||
return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", requestedIP.String(), err)
|
||||
} else if isStatic {
|
||||
// The requested IP is a static IP, owned and managed by the user.
|
||||
isUserOwnedIP = true
|
||||
isSafeToReleaseIP = false
|
||||
ipAddress = requestedIP.String()
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): using user-provided static IP %s", name, ipAddress)
|
||||
} else if requestedIP.String() == fwdRuleIP {
|
||||
// The requested IP is not a static IP, but is currently assigned
|
||||
// to this forwarding rule, so we can keep it.
|
||||
isUserOwnedIP = false
|
||||
isSafeToReleaseIP = true
|
||||
ipAddress, _, err = gce.ensureStaticIP(name, region, fwdRuleIP)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): using user-provided non-static IP %s", name, ipAddress)
|
||||
} else {
|
||||
// The requested IP is not static and it is not assigned to the
|
||||
// current forwarding rule. It might be attached to a different
|
||||
// rule or it might not be part of this project at all. Either
|
||||
// way, we can't use it.
|
||||
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s: %v", requestedIP.String(), name, err)
|
||||
}
|
||||
ipAddress = requestedIP.String()
|
||||
} else {
|
||||
// The user did not request a specific IP.
|
||||
isUserOwnedIP = false
|
||||
|
||||
// This will either allocate a new static IP if the forwarding rule didn't
|
||||
// already have an IP, or it will promote the forwarding rule's IP from
|
||||
// ephemeral to static, or it will just get the IP if it is already
|
||||
// static.
|
||||
// already have an IP, or it will promote the forwarding rule's current
|
||||
// IP from ephemeral to static, or it will just get the IP if it is
|
||||
// already static.
|
||||
existed := false
|
||||
ipAddress, existed, err = gce.ensureStaticIP(name, region, fwdRuleIP)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
|
||||
}
|
||||
if existed {
|
||||
// If the IP was not specifically requested by the user, but it
|
||||
// already existed, it seems to be a failed update cycle. We can
|
||||
// use this IP and try to run through the process again, but we
|
||||
// should not release the IP unless it is explicitly flagged as OK.
|
||||
releaseStaticIP = false
|
||||
isSafeToReleaseIP = false
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): adopting static IP %s", name, ipAddress)
|
||||
} else {
|
||||
// For total clarity. The IP did not pre-exist and the user did
|
||||
// not ask for a particular one, so we can release the IP in case
|
||||
// of failure or success.
|
||||
releaseStaticIP = true
|
||||
isSafeToReleaseIP = true
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): allocated static IP %s", name, ipAddress)
|
||||
}
|
||||
defer func() {
|
||||
if releaseStaticIP {
|
||||
if err := gce.deleteStaticIP(name, region); err != nil {
|
||||
glog.Errorf("failed to release static IP %s during update of load balancer (%v, %v): %v", ipAddress, name, region, err)
|
||||
}
|
||||
} else {
|
||||
glog.Warningf("orphaning static IP %s during update of load balancer (%v, %v): %v", ipAddress, name, region, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Deal with the firewall next. The reason we do this here rather than last
|
||||
@ -530,10 +567,12 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
if err := gce.updateFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): updated firewall", name)
|
||||
} else {
|
||||
if err := gce.createFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created firewall", name)
|
||||
}
|
||||
}
|
||||
|
||||
@ -553,15 +592,17 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
// Begin critical section. If we have to delete the forwarding rule,
|
||||
// and something should fail before we recreate it, don't release the
|
||||
// IP. That way we can come back to it later.
|
||||
releaseStaticIP = false
|
||||
isSafeToReleaseIP = false
|
||||
if err := gce.deleteForwardingRule(name, region); err != nil {
|
||||
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): deleted forwarding rule", name)
|
||||
}
|
||||
if tpExists && tpNeedsUpdate {
|
||||
if err := gce.deleteTargetPool(name, region); err != nil {
|
||||
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): deleted target pool", name)
|
||||
}
|
||||
|
||||
// Once we've deleted the resources (if necessary), build them back up (or for
|
||||
@ -570,6 +611,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil {
|
||||
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created target pool", name)
|
||||
}
|
||||
if tpNeedsUpdate || fwdRuleNeedsUpdate {
|
||||
if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil {
|
||||
@ -577,8 +619,10 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
}
|
||||
// End critical section. It is safe to release the static IP (which
|
||||
// just demotes it to ephemeral) now that it is attached. In the case
|
||||
// of a user-requested IP, the 'defer' won't be installed anyway.
|
||||
releaseStaticIP = true
|
||||
// of a user-requested IP, the "is user-owned" flag will be set,
|
||||
// preventing it from actually being released.
|
||||
isSafeToReleaseIP = true
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created forwarding rule, IP %s", name, ipAddress)
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
@ -875,18 +919,18 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
|
||||
return tags.List(), nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) error {
|
||||
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
|
||||
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list gce IP addresses: %v", err)
|
||||
return false, fmt.Errorf("failed to list gce IP addresses: %v", err)
|
||||
}
|
||||
for _, addr := range addresses.Items {
|
||||
if addr.Address == ipAddress {
|
||||
// This project does own the address, so return success.
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("this gce project doesn't own the IP address: %s", ipAddress)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) ensureStaticIP(name, region, existingIP string) (ipAddress string, created bool, err error) {
|
||||
@ -994,6 +1038,7 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string)
|
||||
|
||||
// EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted.
|
||||
func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error {
|
||||
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v", name, region)
|
||||
err := utilerrors.AggregateGoroutines(
|
||||
func() error { return gce.deleteFirewall(name, region) },
|
||||
// Even though we don't hold on to static IPs for load balancers, it's
|
||||
@ -1070,7 +1115,7 @@ func (gce *GCECloud) deleteFirewall(name, region string) error {
|
||||
func (gce *GCECloud) deleteStaticIP(name, region string) error {
|
||||
op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do()
|
||||
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
glog.Infof("Static IP address %s already deleted. Continuing to delete other resources.", name)
|
||||
glog.Infof("Static IP address %s is not reserved", name)
|
||||
} else if err != nil {
|
||||
glog.Warningf("Failed to delete static IP address %s, got error %v", name, err)
|
||||
return err
|
||||
|
@ -486,15 +486,20 @@ var _ = Describe("Services", func() {
|
||||
// Change the services to LoadBalancer.
|
||||
|
||||
requestedIP := ""
|
||||
staticIPName := ""
|
||||
if providerIs("gce", "gke") {
|
||||
By("creating a static load balancer IP")
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
staticIPName := fmt.Sprintf("e2e-external-lb-test-%d", rand.Intn(65535))
|
||||
staticIPName = fmt.Sprintf("e2e-external-lb-test-%d", rand.Intn(65535))
|
||||
requestedIP, err = createGCEStaticIP(staticIPName)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer func() {
|
||||
// Release GCE static IP - this is not kube-managed and will not be automatically released.
|
||||
deleteGCEStaticIP(staticIPName)
|
||||
if staticIPName != "" {
|
||||
// Release GCE static IP - this is not kube-managed and will not be automatically released.
|
||||
if err := deleteGCEStaticIP(staticIPName); err != nil {
|
||||
Logf("failed to release static IP %s: %v", staticIPName, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
Logf("Allocated static load balancer IP: %s", requestedIP)
|
||||
}
|
||||
@ -523,6 +528,23 @@ var _ = Describe("Services", func() {
|
||||
tcpIngressIP := getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
|
||||
Logf("TCP load balancer: %s", tcpIngressIP)
|
||||
|
||||
By("waiting for the UDP service " + serviceName + " to have a load balancer")
|
||||
if providerIs("gce", "gke") {
|
||||
// Do this as early as possible, which overrides the `defer` above.
|
||||
// This is mostly out of fear of leaking the IP in a timeout case
|
||||
// (as of this writing we're not 100% sure where the leaks are
|
||||
// coming from, so this is first-aid rather than surgery).
|
||||
By("demoting the static IP to ephemeral")
|
||||
if staticIPName != "" {
|
||||
// Deleting it after it is attached "demotes" it to an
|
||||
// ephemeral IP, which can be auto-released.
|
||||
if err := deleteGCEStaticIP(staticIPName); err != nil {
|
||||
Failf("failed to release static IP %s: %v", staticIPName, err)
|
||||
}
|
||||
staticIPName = ""
|
||||
}
|
||||
}
|
||||
|
||||
By("waiting for the UDP service to have a load balancer")
|
||||
// 2nd one should be faster since they ran in parallel.
|
||||
udpService = jig.WaitForLoadBalancerOrFail(ns2, udpService.Name)
|
||||
@ -531,7 +553,7 @@ var _ = Describe("Services", func() {
|
||||
Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort)
|
||||
}
|
||||
udpIngressIP := getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
|
||||
Logf("UDP load balancer: %s", tcpIngressIP)
|
||||
Logf("UDP load balancer: %s", udpIngressIP)
|
||||
|
||||
By("verifying that TCP and UDP use different load balancers")
|
||||
if tcpIngressIP == udpIngressIP {
|
||||
@ -614,7 +636,6 @@ var _ = Describe("Services", func() {
|
||||
if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
|
||||
Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
|
||||
}
|
||||
Logf("service port (TCP and UDP): %d", svcPort)
|
||||
|
||||
By("changing the UDP service's port")
|
||||
udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
|
||||
@ -631,6 +652,8 @@ var _ = Describe("Services", func() {
|
||||
Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
|
||||
}
|
||||
|
||||
Logf("service port (TCP and UDP): %d", svcPort)
|
||||
|
||||
By("hitting the TCP service's NodePort")
|
||||
jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user