Merge pull request #20662 from thockin/e2e-ip-leak

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot
2016-02-06 23:30:56 -08:00
4 changed files with 108 additions and 37 deletions

View File

@@ -409,6 +409,7 @@ kube::golang::build_binaries_for_platform() {
# Go 1.4 added -o to control where the binary is saved, but Go 1.3 doesn't # Go 1.4 added -o to control where the binary is saved, but Go 1.3 doesn't
# have this flag. Whenever we deprecate go 1.3, update to use -o instead of # have this flag. Whenever we deprecate go 1.3, update to use -o instead of
# changing into the output directory. # changing into the output directory.
mkdir -p "$(dirname ${outfile})"
pushd "$(dirname ${outfile})" >/dev/null pushd "$(dirname ${outfile})" >/dev/null
go test -c \ go test -c \
"${goflags[@]:+${goflags[@]}}" \ "${goflags[@]:+${goflags[@]}}" \

View File

@@ -432,7 +432,11 @@ func isHTTPErrorCode(err error, code int) bool {
// new load balancers and updating existing load balancers, recognizing when // new load balancers and updating existing load balancers, recognizing when
// each is needed. // each is needed.
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, ports, hostNames) portStr := []string{}
for _, p := range ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames)
if len(hostNames) == 0 { if len(hostNames) == 0 {
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts") return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
@@ -465,53 +469,86 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// function in order to maintain the invariant that "if the forwarding rule // function in order to maintain the invariant that "if the forwarding rule
// exists, the LB has been fully created". // exists, the LB has been fully created".
ipAddress := "" ipAddress := ""
// Through this process we try to keep track of whether it is safe to // Through this process we try to keep track of whether it is safe to
// release the IP that was allocated. If the user specifically asked for // release the IP that was allocated. If the user specifically asked for
// an IP, we assume they are managing it themselves. Otherwise, we will // an IP, we assume they are managing it themselves. Otherwise, we will
// release the IP in case of early-terminating failure or upon successful // release the IP in case of early-terminating failure or upon successful
// creating of the LB. // creating of the LB.
releaseStaticIP := false isUserOwnedIP := false // if this is set, we never release the IP
isSafeToReleaseIP := false
defer func() {
if isUserOwnedIP {
return
}
if isSafeToReleaseIP {
if err := gce.deleteStaticIP(name, region); err != nil {
glog.Errorf("failed to release static IP %s for load balancer (%v, %v): %v", ipAddress, name, region, err)
}
glog.V(2).Infof("EnsureLoadBalancer(%v): released static IP %s", name, ipAddress)
} else {
glog.Warningf("orphaning static IP %s during update of load balancer (%v, %v): %v", ipAddress, name, region, err)
}
}()
if requestedIP != nil { if requestedIP != nil {
// If a specific IP address has been requested, we have to respect the // If a specific IP address has been requested, we have to respect the
// user's request and use that IP. If the forwarding rule was already using // user's request and use that IP. If the forwarding rule was already using
// a different IP, it will be harmlessly abandoned because it was only an // a different IP, it will be harmlessly abandoned because it was only an
// ephemeral IP (or it was a different static IP owned by the user, in which // ephemeral IP (or it was a different static IP owned by the user, in which
// case we shouldn't delete it anyway). // case we shouldn't delete it anyway).
if err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil { if isStatic, err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil {
return nil, err return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", requestedIP.String(), err)
} else if isStatic {
// The requested IP is a static IP, owned and managed by the user.
isUserOwnedIP = true
isSafeToReleaseIP = false
ipAddress = requestedIP.String()
glog.V(4).Infof("EnsureLoadBalancer(%v): using user-provided static IP %s", name, ipAddress)
} else if requestedIP.String() == fwdRuleIP {
// The requested IP is not a static IP, but is currently assigned
// to this forwarding rule, so we can keep it.
isUserOwnedIP = false
isSafeToReleaseIP = true
ipAddress, _, err = gce.ensureStaticIP(name, region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v): using user-provided non-static IP %s", name, ipAddress)
} else {
// The requested IP is not static and it is not assigned to the
// current forwarding rule. It might be attached to a different
// rule or it might not be part of this project at all. Either
// way, we can't use it.
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s: %v", requestedIP.String(), name, err)
} }
ipAddress = requestedIP.String()
} else { } else {
// The user did not request a specific IP.
isUserOwnedIP = false
// This will either allocate a new static IP if the forwarding rule didn't // This will either allocate a new static IP if the forwarding rule didn't
// already have an IP, or it will promote the forwarding rule's IP from // already have an IP, or it will promote the forwarding rule's current
// ephemeral to static, or it will just get the IP if it is already // IP from ephemeral to static, or it will just get the IP if it is
// static. // already static.
existed := false existed := false
ipAddress, existed, err = gce.ensureStaticIP(name, region, fwdRuleIP) ipAddress, existed, err = gce.ensureStaticIP(name, region, fwdRuleIP)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
} }
if existed { if existed {
// If the IP was not specifically requested by the user, but it // If the IP was not specifically requested by the user, but it
// already existed, it seems to be a failed update cycle. We can // already existed, it seems to be a failed update cycle. We can
// use this IP and try to run through the process again, but we // use this IP and try to run through the process again, but we
// should not release the IP unless it is explicitly flagged as OK. // should not release the IP unless it is explicitly flagged as OK.
releaseStaticIP = false isSafeToReleaseIP = false
glog.V(4).Infof("EnsureLoadBalancer(%v): adopting static IP %s", name, ipAddress)
} else { } else {
// For total clarity. The IP did not pre-exist and the user did // For total clarity. The IP did not pre-exist and the user did
// not ask for a particular one, so we can release the IP in case // not ask for a particular one, so we can release the IP in case
// of failure or success. // of failure or success.
releaseStaticIP = true isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v): allocated static IP %s", name, ipAddress)
} }
defer func() {
if releaseStaticIP {
if err := gce.deleteStaticIP(name, region); err != nil {
glog.Errorf("failed to release static IP %s during update of load balancer (%v, %v): %v", ipAddress, name, region, err)
}
} else {
glog.Warningf("orphaning static IP %s during update of load balancer (%v, %v): %v", ipAddress, name, region, err)
}
}()
} }
// Deal with the firewall next. The reason we do this here rather than last // Deal with the firewall next. The reason we do this here rather than last
@@ -530,10 +567,12 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
if err := gce.updateFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil { if err := gce.updateFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
return nil, err return nil, err
} }
glog.V(4).Infof("EnsureLoadBalancer(%v): updated firewall", name)
} else { } else {
if err := gce.createFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil { if err := gce.createFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
return nil, err return nil, err
} }
glog.V(4).Infof("EnsureLoadBalancer(%v): created firewall", name)
} }
} }
@@ -553,15 +592,17 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// Begin critical section. If we have to delete the forwarding rule, // Begin critical section. If we have to delete the forwarding rule,
// and something should fail before we recreate it, don't release the // and something should fail before we recreate it, don't release the
// IP. That way we can come back to it later. // IP. That way we can come back to it later.
releaseStaticIP = false isSafeToReleaseIP = false
if err := gce.deleteForwardingRule(name, region); err != nil { if err := gce.deleteForwardingRule(name, region); err != nil {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err) return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err)
} }
glog.V(4).Infof("EnsureLoadBalancer(%v): deleted forwarding rule", name)
} }
if tpExists && tpNeedsUpdate { if tpExists && tpNeedsUpdate {
if err := gce.deleteTargetPool(name, region); err != nil { if err := gce.deleteTargetPool(name, region); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err) return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err)
} }
glog.V(4).Infof("EnsureLoadBalancer(%v): deleted target pool", name)
} }
// Once we've deleted the resources (if necessary), build them back up (or for // Once we've deleted the resources (if necessary), build them back up (or for
@@ -570,6 +611,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil { if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err) return nil, fmt.Errorf("failed to create target pool %s: %v", name, err)
} }
glog.V(4).Infof("EnsureLoadBalancer(%v): created target pool", name)
} }
if tpNeedsUpdate || fwdRuleNeedsUpdate { if tpNeedsUpdate || fwdRuleNeedsUpdate {
if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil { if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil {
@@ -577,8 +619,10 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
} }
// End critical section. It is safe to release the static IP (which // End critical section. It is safe to release the static IP (which
// just demotes it to ephemeral) now that it is attached. In the case // just demotes it to ephemeral) now that it is attached. In the case
// of a user-requested IP, the 'defer' won't be installed anyway. // of a user-requested IP, the "is user-owned" flag will be set,
releaseStaticIP = true // preventing it from actually being released.
isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v): created forwarding rule, IP %s", name, ipAddress)
} }
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
@@ -875,18 +919,18 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
return tags.List(), nil return tags.List(), nil
} }
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) error { func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do() addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
if err != nil { if err != nil {
return fmt.Errorf("failed to list gce IP addresses: %v", err) return false, fmt.Errorf("failed to list gce IP addresses: %v", err)
} }
for _, addr := range addresses.Items { for _, addr := range addresses.Items {
if addr.Address == ipAddress { if addr.Address == ipAddress {
// This project does own the address, so return success. // This project does own the address, so return success.
return nil return true, nil
} }
} }
return fmt.Errorf("this gce project doesn't own the IP address: %s", ipAddress) return false, nil
} }
func (gce *GCECloud) ensureStaticIP(name, region, existingIP string) (ipAddress string, created bool, err error) { func (gce *GCECloud) ensureStaticIP(name, region, existingIP string) (ipAddress string, created bool, err error) {
@@ -994,6 +1038,7 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string)
// EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted. // EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted.
func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error { func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error {
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v", name, region)
err := utilerrors.AggregateGoroutines( err := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(name, region) }, func() error { return gce.deleteFirewall(name, region) },
// Even though we don't hold on to static IPs for load balancers, it's // Even though we don't hold on to static IPs for load balancers, it's
@@ -1070,7 +1115,7 @@ func (gce *GCECloud) deleteFirewall(name, region string) error {
func (gce *GCECloud) deleteStaticIP(name, region string) error { func (gce *GCECloud) deleteStaticIP(name, region string) error {
op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do() op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Static IP address %s already deleted. Continuing to delete other resources.", name) glog.Infof("Static IP address %s is not reserved", name)
} else if err != nil { } else if err != nil {
glog.Warningf("Failed to delete static IP address %s, got error %v", name, err) glog.Warningf("Failed to delete static IP address %s, got error %v", name, err)
return err return err

View File

@@ -352,18 +352,20 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
// out so that we can process the delete, which we should soon be receiving // out so that we can process the delete, which we should soon be receiving
// if we haven't already. // if we haven't already.
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
glog.Infof("Not persisting update to service that no longer exists: %v", err) glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return nil return nil
} }
// TODO: Try to resolve the conflict if the change was unrelated to load // TODO: Try to resolve the conflict if the change was unrelated to load
// balancer status. For now, just rely on the fact that we'll // balancer status. For now, just rely on the fact that we'll
// also process the update that caused the resource version to change. // also process the update that caused the resource version to change.
if errors.IsConflict(err) { if errors.IsConflict(err) {
glog.V(4).Infof("Not persisting update to service that has been changed since we received it: %v", err) glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
service.Namespace, service.Name, err)
return nil return nil
} }
glog.Warningf("Failed to persist updated LoadBalancerStatus to service %s after creating its load balancer: %v", glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
service.Name, err) service.Namespace, service.Name, err)
time.Sleep(clientRetryInterval) time.Sleep(clientRetryInterval)
} }
return err return err

View File

@@ -486,15 +486,20 @@ var _ = Describe("Services", func() {
// Change the services to LoadBalancer. // Change the services to LoadBalancer.
requestedIP := "" requestedIP := ""
staticIPName := ""
if providerIs("gce", "gke") { if providerIs("gce", "gke") {
By("creating a static load balancer IP") By("creating a static load balancer IP")
rand.Seed(time.Now().UTC().UnixNano()) rand.Seed(time.Now().UTC().UnixNano())
staticIPName := fmt.Sprintf("e2e-external-lb-test-%d", rand.Intn(65535)) staticIPName = fmt.Sprintf("e2e-external-lb-test-%d", rand.Intn(65535))
requestedIP, err = createGCEStaticIP(staticIPName) requestedIP, err = createGCEStaticIP(staticIPName)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
defer func() { defer func() {
// Release GCE static IP - this is not kube-managed and will not be automatically released. if staticIPName != "" {
deleteGCEStaticIP(staticIPName) // Release GCE static IP - this is not kube-managed and will not be automatically released.
if err := deleteGCEStaticIP(staticIPName); err != nil {
Logf("failed to release static IP %s: %v", staticIPName, err)
}
}
}() }()
Logf("Allocated static load balancer IP: %s", requestedIP) Logf("Allocated static load balancer IP: %s", requestedIP)
} }
@@ -523,6 +528,23 @@ var _ = Describe("Services", func() {
tcpIngressIP := getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) tcpIngressIP := getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
Logf("TCP load balancer: %s", tcpIngressIP) Logf("TCP load balancer: %s", tcpIngressIP)
By("waiting for the UDP service " + serviceName + " to have a load balancer")
if providerIs("gce", "gke") {
// Do this as early as possible, which overrides the `defer` above.
// This is mostly out of fear of leaking the IP in a timeout case
// (as of this writing we're not 100% sure where the leaks are
// coming from, so this is first-aid rather than surgery).
By("demoting the static IP to ephemeral")
if staticIPName != "" {
// Deleting it after it is attached "demotes" it to an
// ephemeral IP, which can be auto-released.
if err := deleteGCEStaticIP(staticIPName); err != nil {
Failf("failed to release static IP %s: %v", staticIPName, err)
}
staticIPName = ""
}
}
By("waiting for the UDP service to have a load balancer") By("waiting for the UDP service to have a load balancer")
// 2nd one should be faster since they ran in parallel. // 2nd one should be faster since they ran in parallel.
udpService = jig.WaitForLoadBalancerOrFail(ns2, udpService.Name) udpService = jig.WaitForLoadBalancerOrFail(ns2, udpService.Name)
@@ -531,7 +553,7 @@ var _ = Describe("Services", func() {
Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort) Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort)
} }
udpIngressIP := getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) udpIngressIP := getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
Logf("UDP load balancer: %s", tcpIngressIP) Logf("UDP load balancer: %s", udpIngressIP)
By("verifying that TCP and UDP use different load balancers") By("verifying that TCP and UDP use different load balancers")
if tcpIngressIP == udpIngressIP { if tcpIngressIP == udpIngressIP {
@@ -614,7 +636,6 @@ var _ = Describe("Services", func() {
if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP { if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
} }
Logf("service port (TCP and UDP): %d", svcPort)
By("changing the UDP service's port") By("changing the UDP service's port")
udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) { udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
@@ -631,6 +652,8 @@ var _ = Describe("Services", func() {
Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])) Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
} }
Logf("service port (TCP and UDP): %d", svcPort)
By("hitting the TCP service's NodePort") By("hitting the TCP service's NodePort")
jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout) jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)