Merge pull request #21319 from Clarifai/ensure-lb-servicename

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-27 02:03:14 -08:00
commit 394d5da23c

View File

@ -91,7 +91,7 @@ type GCECloud struct {
containerService *container.Service containerService *container.Service
projectID string projectID string
region string region string
localZone string // The zone in which we are runniing localZone string // The zone in which we are running
managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master) managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master)
networkURL string networkURL string
useMetadataServer bool useMetadataServer bool
@ -467,11 +467,12 @@ func isHTTPErrorCode(err error, code int) bool {
// Due to an interesting series of design decisions, this handles both creating // Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when // new load balancers and updating existing load balancers, recognizing when
// each is needed. // each is needed.
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) { func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, svc types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
portStr := []string{} portStr := []string{}
for _, p := range ports { for _, p := range ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port)) portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
} }
serviceName := svc.String()
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName, annotations) glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName, annotations)
if len(hostNames) == 0 { if len(hostNames) == 0 {
@ -546,7 +547,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// to this forwarding rule, so we can keep it. // to this forwarding rule, so we can keep it.
isUserOwnedIP = false isUserOwnedIP = false
isSafeToReleaseIP = true isSafeToReleaseIP = true
ipAddress, _, err = gce.ensureStaticIP(name, region, fwdRuleIP) ipAddress, _, err = gce.ensureStaticIP(name, serviceName, region, fwdRuleIP)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
} }
@ -567,7 +568,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// IP from ephemeral to static, or it will just get the IP if it is // IP from ephemeral to static, or it will just get the IP if it is
// already static. // already static.
existed := false existed := false
ipAddress, existed, err = gce.ensureStaticIP(name, region, fwdRuleIP) ipAddress, existed, err = gce.ensureStaticIP(name, serviceName, region, fwdRuleIP)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
} }
@ -600,13 +601,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
sourceRanges = strings.Split(val, ",") sourceRanges = strings.Split(val, ",")
} }
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports, sourceRanges) firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, serviceName, region, ipAddress, ports, sourceRanges)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if firewallNeedsUpdate { if firewallNeedsUpdate {
desc := makeFirewallDescription(ipAddress) desc := makeFirewallDescription(serviceName, ipAddress)
// Unlike forwarding rules and target pools, firewalls can be updated // Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated. // without needing to be deleted and recreated.
if firewallExists { if firewallExists {
@ -654,13 +655,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// Once we've deleted the resources (if necessary), build them back up (or for // Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new). // the first time if they're new).
if tpNeedsUpdate { if tpNeedsUpdate {
if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil { if err := gce.createTargetPool(name, serviceName, region, hosts, affinityType); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err) return nil, fmt.Errorf("failed to create target pool %s: %v", name, err)
} }
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", name, serviceName) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", name, serviceName)
} }
if tpNeedsUpdate || fwdRuleNeedsUpdate { if tpNeedsUpdate || fwdRuleNeedsUpdate {
if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil { if err := gce.createForwardingRule(name, serviceName, region, ipAddress, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", name, err) return nil, fmt.Errorf("failed to create forwarding rule %s: %v", name, err)
} }
// End critical section. It is safe to release the static IP (which // End critical section. It is safe to release the static IP (which
@ -758,7 +759,7 @@ func translateAffinityType(affinityType api.ServiceAffinity) string {
} }
} }
func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort, sourceRanges []string) (exists bool, needsUpdate bool, err error) { func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []*api.ServicePort, sourceRanges []string) (exists bool, needsUpdate bool, err error) {
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do() fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil { if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) { if isHTTPErrorCode(err, http.StatusNotFound) {
@ -766,7 +767,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [
} }
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err) return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
} }
if fw.Description != makeFirewallDescription(ipAddress) { if fw.Description != makeFirewallDescription(serviceName, ipAddress) {
return true, true, nil return true, true, nil
} }
if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") { if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") {
@ -792,8 +793,9 @@ func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name) return fmt.Sprintf("k8s-fw-%s", name)
} }
func makeFirewallDescription(ipAddress string) string { func makeFirewallDescription(serviceName, ipAddress string) string {
return fmt.Sprintf("KubernetesAutoGenerated_OnlyAllowTrafficForDestinationIP_%s", ipAddress) return fmt.Sprintf(`{"kubernetes.io/service-ip":"%s", "kubernetes.io/service-name":"%s"}`,
ipAddress, serviceName)
} }
func slicesEqual(x, y []string) bool { func slicesEqual(x, y []string) bool {
@ -810,16 +812,18 @@ func slicesEqual(x, y []string) bool {
return true return true
} }
func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports []*api.ServicePort) error { func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []*api.ServicePort) error {
portRange, err := loadBalancerPortRange(ports) portRange, err := loadBalancerPortRange(ports)
if err != nil { if err != nil {
return err return err
} }
req := &compute.ForwardingRule{ req := &compute.ForwardingRule{
Name: name, Name: name,
IPAddress: ipAddress, IPProtocol: string(ports[0].Protocol), Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
PortRange: portRange, IPAddress: ipAddress,
Target: gce.targetPoolURL(name, region), IPProtocol: string(ports[0].Protocol),
PortRange: portRange,
Target: gce.targetPoolURL(name, region),
} }
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
@ -835,13 +839,14 @@ func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports
return nil return nil
} }
func (gce *GCECloud) createTargetPool(name, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error { func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error {
var instances []string var instances []string
for _, host := range hosts { for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name)) instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
} }
pool := &compute.TargetPool{ pool := &compute.TargetPool{
Name: name, Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
Instances: instances, Instances: instances,
SessionAffinity: translateAffinityType(affinityType), SessionAffinity: translateAffinityType(affinityType),
} }
@ -982,13 +987,16 @@ func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string)
return false, nil return false, nil
} }
func (gce *GCECloud) ensureStaticIP(name, region, existingIP string) (ipAddress string, created bool, err error) { func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string) (ipAddress string, created bool, err error) {
// If the address doesn't exist, this will create it. // If the address doesn't exist, this will create it.
// If the existingIP exists but is ephemeral, this will promote it to static. // If the existingIP exists but is ephemeral, this will promote it to static.
// If the address already exists, this will harmlessly return a StatusConflict // If the address already exists, this will harmlessly return a StatusConflict
// and we'll grab the IP before returning. // and we'll grab the IP before returning.
existed := false existed := false
addressObj := &compute.Address{Name: name} addressObj := &compute.Address{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
}
if existingIP != "" { if existingIP != "" {
addressObj.Address = existingIP addressObj.Address = existingIP
} }