* Added UDP LB support (for GCE)

This commit is contained in:
Kenneth Shelton 2015-09-28 13:57:58 -07:00
parent 671b5982cf
commit d399a8f8cc
21 changed files with 293 additions and 146 deletions

View File

@ -1457,8 +1457,8 @@ func ValidateService(service *api.Service) field.ErrorList {
portsPath := specPath.Child("ports") portsPath := specPath.Child("ports")
for i := range service.Spec.Ports { for i := range service.Spec.Ports {
portPath := portsPath.Index(i) portPath := portsPath.Index(i)
if service.Spec.Ports[i].Protocol != api.ProtocolTCP { if !supportedPortProtocols.Has(string(service.Spec.Ports[i].Protocol)) {
allErrs = append(allErrs, field.Invalid(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "may not use protocols other than 'TCP' when `type` is 'LoadBalancer'")) allErrs = append(allErrs, validation.NewInvalidError(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP ports"))
} }
} }
} }

View File

@ -2095,20 +2095,20 @@ func TestValidateService(t *testing.T) {
numErrs: 1, numErrs: 1,
}, },
{ {
name: "invalid load balancer protocol 1", name: "valid load balancer protocol UDP 1",
tweakSvc: func(s *api.Service) { tweakSvc: func(s *api.Service) {
s.Spec.Type = api.ServiceTypeLoadBalancer s.Spec.Type = api.ServiceTypeLoadBalancer
s.Spec.Ports[0].Protocol = "UDP" s.Spec.Ports[0].Protocol = "UDP"
}, },
numErrs: 1, numErrs: 0,
}, },
{ {
name: "invalid load balancer protocol 2", name: "valid load balancer protocol UDP 2",
tweakSvc: func(s *api.Service) { tweakSvc: func(s *api.Service) {
s.Spec.Type = api.ServiceTypeLoadBalancer s.Spec.Type = api.ServiceTypeLoadBalancer
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "UDP", TargetPort: intstr.FromInt(12345)}) s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "UDP", TargetPort: intstr.FromInt(12345)})
}, },
numErrs: 1, numErrs: 0,
}, },
{ {
name: "valid 1", name: "valid 1",

View File

@ -27,8 +27,8 @@ import (
// Interface is an abstract, pluggable interface for cloud providers. // Interface is an abstract, pluggable interface for cloud providers.
type Interface interface { type Interface interface {
// TCPLoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise. // LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
TCPLoadBalancer() (TCPLoadBalancer, bool) LoadBalancer() (LoadBalancer, bool)
// Instances returns an instances interface. Also returns true if the interface is supported, false otherwise. // Instances returns an instances interface. Also returns true if the interface is supported, false otherwise.
Instances() (Instances, bool) Instances() (Instances, bool)
// Zones returns a zones interface. Also returns true if the interface is supported, false otherwise. // Zones returns a zones interface. Also returns true if the interface is supported, false otherwise.
@ -76,23 +76,23 @@ func GetInstanceProviderID(cloud Interface, nodeName string) (string, error) {
return cloud.ProviderName() + "://" + instanceID, nil return cloud.ProviderName() + "://" + instanceID, nil
} }
// TCPLoadBalancer is an abstract, pluggable interface for TCP load balancers. // LoadBalancer is an abstract, pluggable interface for load balancers.
type TCPLoadBalancer interface { type LoadBalancer interface {
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
// GetTCPLoadBalancer returns whether the specified load balancer exists, and // GetLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is. // if so, what its status is.
GetTCPLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
// EnsureTCPLoadBalancer creates a new tcp load balancer, or updates an existing one. Returns the status of the balancer // EnsureLoadBalancer creates a new load balancer, or updates an existing one. Returns the status of the balancer
EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
// UpdateTCPLoadBalancer updates hosts under the specified load balancer. // UpdateLoadBalancer updates hosts under the specified load balancer.
UpdateTCPLoadBalancer(name, region string, hosts []string) error UpdateLoadBalancer(name, region string, hosts []string) error
// EnsureTCPLoadBalancerDeleted deletes the specified load balancer if it // EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or // exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted. // was successfully deleted.
// This construction is useful because many cloud providers' load balancers // This construction is useful because many cloud providers' load balancers
// have multiple underlying components, meaning a Get could say that the LB // have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around. // doesn't exist even if some part of it is still laying around.
EnsureTCPLoadBalancerDeleted(name, region string) error EnsureLoadBalancerDeleted(name, region string) error
} }
// Instances is an abstract, pluggable interface for sets of instances. // Instances is an abstract, pluggable interface for sets of instances.

View File

@ -179,7 +179,7 @@ type InstanceGroupInfo interface {
CurrentSize() (int, error) CurrentSize() (int, error)
} }
// AWSCloud is an implementation of Interface, TCPLoadBalancer and Instances for Amazon Web Services. // AWSCloud is an implementation of Interface, LoadBalancer and Instances for Amazon Web Services.
type AWSCloud struct { type AWSCloud struct {
ec2 EC2 ec2 EC2
elb ELB elb ELB
@ -619,8 +619,8 @@ func (aws *AWSCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, searches return nameservers, searches
} }
// TCPLoadBalancer returns an implementation of TCPLoadBalancer for Amazon Web Services. // LoadBalancer returns an implementation of LoadBalancer for Amazon Web Services.
func (s *AWSCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { func (s *AWSCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return s, true return s, true
} }
@ -1695,8 +1695,8 @@ func (s *AWSCloud) listSubnetIDsinVPC(vpcId string) ([]string, error) {
// EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer // EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway. // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway.
func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts)
if region != s.region { if region != s.region {
return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
@ -1707,6 +1707,15 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
} }
if len(ports) == 0 {
return nil, fmt.Errorf("requested load balancer with no ports")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
if ports[0].Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB")
}
if publicIP != nil { if publicIP != nil {
return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB") return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB")
} }
@ -1812,8 +1821,8 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p
return status, nil return status, nil
} }
// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer // GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (s *AWSCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { func (s *AWSCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
if region != s.region { if region != s.region {
return nil, false, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) return nil, false, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
} }
@ -1976,8 +1985,8 @@ func (s *AWSCloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalan
return nil return nil
} }
// EnsureTCPLoadBalancerDeleted implements TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. // EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted.
func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
if region != s.region { if region != s.region {
return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
} }
@ -2070,8 +2079,8 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
return nil return nil
} }
// UpdateTCPLoadBalancer implements TCPLoadBalancer.UpdateTCPLoadBalancer // UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { func (s *AWSCloud) UpdateLoadBalancer(name, region string, hosts []string) error {
if region != s.region { if region != s.region {
return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
} }

View File

@ -693,24 +693,24 @@ func TestLoadBalancerMatchesClusterRegion(t *testing.T) {
badELBRegion := "bad-elb-region" badELBRegion := "bad-elb-region"
errorMessage := fmt.Sprintf("requested load balancer region '%s' does not match cluster region '%s'", badELBRegion, c.region) errorMessage := fmt.Sprintf("requested load balancer region '%s' does not match cluster region '%s'", badELBRegion, c.region)
_, _, err = c.GetTCPLoadBalancer("elb-name", badELBRegion) _, _, err = c.GetLoadBalancer("elb-name", badELBRegion)
if err == nil || err.Error() != errorMessage { if err == nil || err.Error() != errorMessage {
t.Errorf("Expected GetTCPLoadBalancer region mismatch error.") t.Errorf("Expected GetLoadBalancer region mismatch error.")
} }
_, err = c.EnsureTCPLoadBalancer("elb-name", badELBRegion, nil, nil, nil, api.ServiceAffinityNone) _, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, api.ServiceAffinityNone)
if err == nil || err.Error() != errorMessage { if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureTCPLoadBalancer region mismatch error.") t.Errorf("Expected EnsureLoadBalancer region mismatch error.")
} }
err = c.EnsureTCPLoadBalancerDeleted("elb-name", badELBRegion) err = c.EnsureLoadBalancerDeleted("elb-name", badELBRegion)
if err == nil || err.Error() != errorMessage { if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureTCPLoadBalancerDeleted region mismatch error.") t.Errorf("Expected EnsureLoadBalancerDeleted region mismatch error.")
} }
err = c.UpdateTCPLoadBalancer("elb-name", badELBRegion, nil) err = c.UpdateLoadBalancer("elb-name", badELBRegion, nil)
if err == nil || err.Error() != errorMessage { if err == nil || err.Error() != errorMessage {
t.Errorf("Expected UpdateTCPLoadBalancer region mismatch error.") t.Errorf("Expected UpdateLoadBalancer region mismatch error.")
} }
} }

View File

@ -15,5 +15,5 @@ limitations under the License.
*/ */
// Package fake is a test-double implementation of cloudprovider // Package fake is a test-double implementation of cloudprovider
// Interface, TCPLoadBalancer and Instances. It is useful for testing. // Interface, LoadBalancer and Instances. It is useful for testing.
package fake package fake

View File

@ -44,7 +44,7 @@ type FakeUpdateBalancerCall struct {
Hosts []string Hosts []string
} }
// FakeCloud is a test-double implementation of Interface, TCPLoadBalancer, Instances, and Routes. It is useful for testing. // FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing.
type FakeCloud struct { type FakeCloud struct {
Exists bool Exists bool
Err error Err error
@ -99,9 +99,9 @@ func (f *FakeCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []s
return nameservers, searches return nameservers, searches
} }
// TCPLoadBalancer returns a fake implementation of TCPLoadBalancer. // LoadBalancer returns a fake implementation of LoadBalancer.
// Actually it just returns f itself. // Actually it just returns f itself.
func (f *FakeCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { func (f *FakeCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return f, true return f, true
} }
@ -120,17 +120,17 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) {
return f, true return f, true
} }
// GetTCPLoadBalancer is a stub implementation of TCPLoadBalancer.GetTCPLoadBalancer. // GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer.
func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
return status, f.Exists, f.Err return status, f.Exists, f.Err
} }
// EnsureTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
// It adds an entry "create" into the internal method call record. // It adds an entry "create" into the internal method call record.
func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
f.addCall("create") f.addCall("create")
if f.Balancers == nil { if f.Balancers == nil {
f.Balancers = make(map[string]FakeBalancer) f.Balancers = make(map[string]FakeBalancer)
@ -143,17 +143,17 @@ func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP
return status, f.Err return status, f.Err
} }
// UpdateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.UpdateTCPLoadBalancer. // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer.
// It adds an entry "update" into the internal method call record. // It adds an entry "update" into the internal method call record.
func (f *FakeCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { func (f *FakeCloud) UpdateLoadBalancer(name, region string, hosts []string) error {
f.addCall("update") f.addCall("update")
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts}) f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts})
return f.Err return f.Err
} }
// EnsureTCPLoadBalancerDeleted is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. // EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted.
// It adds an entry "delete" into the internal method call record. // It adds an entry "delete" into the internal method call record.
func (f *FakeCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { func (f *FakeCloud) EnsureLoadBalancerDeleted(name, region string) error {
f.addCall("delete") f.addCall("delete")
return f.Err return f.Err
} }

View File

@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// Package gce is an implementation of Interface, TCPLoadBalancer // Package gce is an implementation of Interface, LoadBalancer
// and Instances for Google Compute Engine. // and Instances for Google Compute Engine.
package gce package gce

View File

@ -60,7 +60,7 @@ const (
operationPollTimeoutDuration = 30 * time.Minute operationPollTimeoutDuration = 30 * time.Minute
) )
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine. // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
type GCECloud struct { type GCECloud struct {
service *compute.Service service *compute.Service
containerService *container.Service containerService *container.Service
@ -253,8 +253,8 @@ func (gce *GCECloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, srchOut return nameservers, srchOut
} }
// TCPLoadBalancer returns an implementation of TCPLoadBalancer for Google Compute Engine. // LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine.
func (gce *GCECloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { func (gce *GCECloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return gce, true return gce, true
} }
@ -350,8 +350,8 @@ func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error {
}) })
} }
// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer // GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (gce *GCECloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { func (gce *GCECloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err == nil { if err == nil {
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
@ -370,16 +370,18 @@ func isHTTPErrorCode(err error, code int) bool {
return ok && apiErr.Code == code return ok && apiErr.Code == code
} }
// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. // EnsureLoadBalancer is an implementation of LoadBalancer.EnsureLoadBalancer.
// Our load balancers in GCE consist of four separate GCE resources - a static // Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This // IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them. // function has to manage all of them.
// Due to an interesting series of design decisions, this handles both creating // Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when // new load balancers and updating existing load balancers, recognizing when
// each is needed. // each is needed.
func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, ports, hosts)
if len(hosts) == 0 { if len(hosts) == 0 {
return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts") return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
} }
// Check if the forwarding rule exists, and if so, what its IP is. // Check if the forwarding rule exists, and if so, what its IP is.
@ -426,7 +428,7 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net.
// Deal with the firewall next. The reason we do this here rather than last // Deal with the firewall next. The reason we do this here rather than last
// is because the forwarding rule is used as the indicator that the load // is because the forwarding rule is used as the indicator that the load
// balancer is fully created - it's what getTCPLoadBalancer checks for. // balancer is fully created - it's what getLoadBalancer checks for.
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports) firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports)
if err != nil { if err != nil {
return nil, err return nil, err
@ -515,6 +517,11 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP
if portRange != fwd.PortRange { if portRange != fwd.PortRange {
return true, true, fwd.IPAddress, nil return true, true, fwd.IPAddress, nil
} }
// The service controller verified all the protocols match on the ports, just check the first one
if string(ports[0].Protocol) != fwd.IPProtocol {
return true, true, fwd.IPAddress, nil
}
return true, false, fwd.IPAddress, nil return true, false, fwd.IPAddress, nil
} }
@ -522,6 +529,12 @@ func loadBalancerPortRange(ports []*api.ServicePort) (string, error) {
if len(ports) == 0 { if len(ports) == 0 {
return "", fmt.Errorf("no ports specified for GCE load balancer") return "", fmt.Errorf("no ports specified for GCE load balancer")
} }
// The service controller verified all the protocols match on the ports, just check and use the first one
if ports[0].Protocol != api.ProtocolTCP && ports[0].Protocol != api.ProtocolUDP {
return "", fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(ports[0].Protocol))
}
minPort := 65536 minPort := 65536
maxPort := 0 maxPort := 0
for i := range ports { for i := range ports {
@ -575,7 +588,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [
if fw.Description != makeFirewallDescription(ipAddress) { if fw.Description != makeFirewallDescription(ipAddress) {
return true, true, nil return true, true, nil
} }
if len(fw.Allowed) != 1 || fw.Allowed[0].IPProtocol != "tcp" { if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") {
return true, true, nil return true, true, nil
} }
// Make sure the allowed ports match. // Make sure the allowed ports match.
@ -586,6 +599,8 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [
if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) { if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) {
return true, true, nil return true, true, nil
} }
// The service controller already verified that the protocol matches on all ports, no need to check.
return true, false, nil return true, false, nil
} }
@ -617,11 +632,10 @@ func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports
return err return err
} }
req := &compute.ForwardingRule{ req := &compute.ForwardingRule{
Name: name, Name: name,
IPAddress: ipAddress, IPAddress: ipAddress, IPProtocol: string(ports[0].Protocol),
IPProtocol: "TCP", PortRange: portRange,
PortRange: portRange, Target: gce.targetPoolURL(name, region),
Target: gce.targetPoolURL(name, region),
} }
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
@ -713,7 +727,7 @@ func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api
TargetTags: hostTags, TargetTags: hostTags,
Allowed: []*compute.FirewallAllowed{ Allowed: []*compute.FirewallAllowed{
{ {
IPProtocol: "tcp", IPProtocol: strings.ToLower(string(ports[0].Protocol)),
Ports: allowedPorts, Ports: allowedPorts,
}, },
}, },
@ -803,8 +817,8 @@ func (gce *GCECloud) createOrPromoteStaticIP(name, region, existingIP string) (i
return address.Address, nil return address.Address, nil
} }
// UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer. // UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { func (gce *GCECloud) UpdateLoadBalancer(name, region string, hosts []string) error {
pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
if err != nil { if err != nil {
return err return err
@ -864,12 +878,12 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string)
return nil return nil
} }
// EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. // EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted.
func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error { func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error {
err := utilerrors.AggregateGoroutines( err := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(name, region) }, func() error { return gce.deleteFirewall(name, region) },
// Even though we don't hold on to static IPs for load balancers, it's // Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureTCPLoadBalancer left one around in a failed // possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case. // creation/update attempt, so make sure we clean it up here just in case.
func() error { return gce.deleteStaticIP(name, region) }, func() error { return gce.deleteStaticIP(name, region) },
func() error { func() error {

View File

@ -102,10 +102,10 @@ func (c *MesosCloud) Instances() (cloudprovider.Instances, bool) {
return c, true return c, true
} }
// TCPLoadBalancer always returns nil, false in this implementation. // LoadBalancer always returns nil, false in this implementation.
// Mesos does not provide any type of native load balancing by default, // Mesos does not provide any type of native load balancing by default,
// so this implementation always returns (nil, false). // so this implementation always returns (nil, false).
func (c *MesosCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { func (c *MesosCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false return nil, false
} }

View File

@ -121,15 +121,15 @@ func Test_Instances(t *testing.T) {
} }
} }
// test mesos.TCPLoadBalancer // test mesos.LoadBalancer
func Test_TcpLoadBalancer(t *testing.T) { func Test_TcpLoadBalancer(t *testing.T) {
defer log.Flush() defer log.Flush()
mesosCloud, _ := newMesosCloud(nil) mesosCloud, _ := newMesosCloud(nil)
lb, supports_lb := mesosCloud.TCPLoadBalancer() lb, supports_lb := mesosCloud.LoadBalancer()
if supports_lb || lb != nil { if supports_lb || lb != nil {
t.Fatalf("MesosCloud does not provide an implementation of TCPLoadBalancer") t.Fatalf("MesosCloud does not provide an implementation of LoadBalancer")
} }
} }

View File

@ -495,8 +495,8 @@ type LoadBalancer struct {
opts LoadBalancerOpts opts LoadBalancerOpts
} }
func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { func (os *OpenStack) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
glog.V(4).Info("openstack.TCPLoadBalancer() called") glog.V(4).Info("openstack.LoadBalancer() called")
// TODO: Search for and support Rackspace loadbalancer API, and others. // TODO: Search for and support Rackspace loadbalancer API, and others.
network, err := openstack.NewNetworkV2(os.provider, gophercloud.EndpointOpts{ network, err := openstack.NewNetworkV2(os.provider, gophercloud.EndpointOpts{
@ -515,7 +515,7 @@ func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
return nil, false return nil, false
} }
glog.V(1).Info("Claiming to support TCPLoadBalancer") glog.V(1).Info("Claiming to support LoadBalancer")
return &LoadBalancer{network, compute, os.lbOpts}, true return &LoadBalancer{network, compute, os.lbOpts}, true
} }
@ -630,7 +630,7 @@ func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*f
return &floatingIPList[0], nil return &floatingIPList[0], nil
} }
func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
vip, err := getVipByName(lb.network, name) vip, err := getVipByName(lb.network, name)
if err == ErrNotFound { if err == ErrNotFound {
return nil, false, nil return nil, false, nil
@ -650,11 +650,18 @@ func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalanc
// a list of regions (from config) and query/create loadbalancers in // a list of regions (from config) and query/create loadbalancers in
// each region. // each region.
func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, affinity) glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, affinity)
if len(ports) > 1 { if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
} else if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
if ports[0].Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
} }
var persistence *vips.SessionPersistence var persistence *vips.SessionPersistence
@ -668,7 +675,7 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI
} }
glog.V(2).Info("Checking if openstack load balancer already exists: %s", name) glog.V(2).Info("Checking if openstack load balancer already exists: %s", name)
_, exists, err := lb.GetTCPLoadBalancer(name, region) _, exists, err := lb.GetLoadBalancer(name, region)
if err != nil { if err != nil {
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
} }
@ -676,7 +683,7 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI
// TODO: Implement a more efficient update strategy for common changes than delete & create // TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts // In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists { if exists {
err := lb.EnsureTCPLoadBalancerDeleted(name, region) err := lb.EnsureLoadBalancerDeleted(name, region)
if err != nil { if err != nil {
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
} }
@ -777,8 +784,8 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI
} }
func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []string) error { func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string) error {
glog.V(4).Infof("UpdateTCPLoadBalancer(%v, %v, %v)", name, region, hosts) glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", name, region, hosts)
vip, err := getVipByName(lb.network, name) vip, err := getVipByName(lb.network, name)
if err != nil { if err != nil {
@ -838,8 +845,8 @@ func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []strin
return nil return nil
} }
func (lb *LoadBalancer) EnsureTCPLoadBalancerDeleted(name, region string) error { func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error {
glog.V(4).Infof("EnsureTCPLoadBalancerDeleted(%v, %v)", name, region) glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", name, region)
vip, err := getVipByName(lb.network, name) vip, err := getVipByName(lb.network, name)
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {

View File

@ -151,7 +151,7 @@ func TestInstances(t *testing.T) {
t.Logf("Found NodeAddresses(%s) = %s\n", srvs[0], addrs) t.Logf("Found NodeAddresses(%s) = %s\n", srvs[0], addrs)
} }
func TestTCPLoadBalancer(t *testing.T) { func TestLoadBalancer(t *testing.T) {
cfg, ok := configFromEnv() cfg, ok := configFromEnv()
if !ok { if !ok {
t.Skipf("No config found in environment") t.Skipf("No config found in environment")
@ -162,17 +162,17 @@ func TestTCPLoadBalancer(t *testing.T) {
t.Fatalf("Failed to construct/authenticate OpenStack: %s", err) t.Fatalf("Failed to construct/authenticate OpenStack: %s", err)
} }
lb, ok := os.TCPLoadBalancer() lb, ok := os.LoadBalancer()
if !ok { if !ok {
t.Fatalf("TCPLoadBalancer() returned false - perhaps your stack doesn't support Neutron?") t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?")
} }
_, exists, err := lb.GetTCPLoadBalancer("noexist", "region") _, exists, err := lb.GetLoadBalancer("noexist", "region")
if err != nil { if err != nil {
t.Fatalf("GetTCPLoadBalancer(\"noexist\") returned error: %s", err) t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
} }
if exists { if exists {
t.Fatalf("GetTCPLoadBalancer(\"noexist\") returned exists") t.Fatalf("GetLoadBalancer(\"noexist\") returned exists")
} }
} }

View File

@ -128,8 +128,8 @@ func (v *OVirtCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, searches return nameservers, searches
} }
// TCPLoadBalancer returns an implementation of TCPLoadBalancer for oVirt cloud // LoadBalancer returns an implementation of LoadBalancer for oVirt cloud
func (v *OVirtCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { func (v *OVirtCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false return nil, false
} }

View File

@ -367,7 +367,7 @@ func (os *Rackspace) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, searches return nameservers, searches
} }
func (os *Rackspace) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { func (os *Rackspace) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false return nil, false
} }

View File

@ -69,7 +69,7 @@ type ServiceController struct {
cloud cloudprovider.Interface cloud cloudprovider.Interface
kubeClient client.Interface kubeClient client.Interface
clusterName string clusterName string
balancer cloudprovider.TCPLoadBalancer balancer cloudprovider.LoadBalancer
zone cloudprovider.Zone zone cloudprovider.Zone
cache *serviceCache cache *serviceCache
eventBroadcaster record.EventBroadcaster eventBroadcaster record.EventBroadcaster
@ -150,9 +150,9 @@ func (s *ServiceController) init() error {
return fmt.Errorf("ServiceController should not be run without a cloudprovider.") return fmt.Errorf("ServiceController should not be run without a cloudprovider.")
} }
balancer, ok := s.cloud.TCPLoadBalancer() balancer, ok := s.cloud.LoadBalancer()
if !ok { if !ok {
return fmt.Errorf("the cloud provider does not support TCP load balancers.") return fmt.Errorf("the cloud provider does not support external load balancers.")
} }
s.balancer = balancer s.balancer = balancer
@ -256,7 +256,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
s.cache.set(namespacedName.String(), cachedService) s.cache.set(namespacedName.String(), cachedService)
case cache.Deleted: case cache.Deleted:
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region) err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region)
if err != nil { if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error() message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message) s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
@ -278,7 +278,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
return nil, notRetryable return nil, notRetryable
} }
// Note: It is safe to just call EnsureTCPLoadBalancer. But, on some clouds that requires a delete & create, // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events. // which may involve service interruption. Also, we would like user-friendly events.
// Save the state so we can avoid a write if it doesn't change // Save the state so we can avoid a write if it doesn't change
@ -293,8 +293,8 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
} else { } else {
// If we don't have any cached memory of the load balancer, we have to ask // If we don't have any cached memory of the load balancer, we have to ask
// the cloud provider for what it knows about it. // the cloud provider for what it knows about it.
// Technically EnsureTCPLoadBalancerDeleted can cope, but we want to post meaningful events // Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events
_, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region) _, exists, err := s.balancer.GetLoadBalancer(s.loadBalancerName(service), s.zone.Region)
if err != nil { if err != nil {
return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable
} }
@ -306,7 +306,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
if needDelete { if needDelete {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName) glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { if err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil {
return err, retryable return err, retryable
} }
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
@ -378,7 +378,9 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error {
return err return err
} }
name := s.loadBalancerName(service) name := s.loadBalancerName(service)
status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP), // getPortsForLB already verified that the protocol matches for all ports.
// The cloud provider will verify the protocol is supported
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
if err != nil { if err != nil {
return err return err
@ -482,15 +484,18 @@ func (s *ServiceController) loadBalancerName(service *api.Service) string {
} }
func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) { func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
var protocol api.Protocol
ports := []*api.ServicePort{} ports := []*api.ServicePort{}
for i := range service.Spec.Ports { for i := range service.Spec.Ports {
// TODO: Support UDP. Remove the check from the API validation package once
// it's supported.
sp := &service.Spec.Ports[i] sp := &service.Spec.Ports[i]
if sp.Protocol != api.ProtocolTCP { // The check on protocol was removed here. The cloud provider itself is now responsible for all protocol validation
return nil, fmt.Errorf("load balancers for non TCP services are not currently supported.")
}
ports = append(ports, sp) ports = append(ports, sp)
if protocol == "" {
protocol = sp.Protocol
} else if protocol != sp.Protocol && wantsLoadBalancer(service) {
return nil, fmt.Errorf("mixed protocol external load balancers are not supported.")
}
} }
return ports, nil return ports, nil
} }
@ -667,7 +672,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, h
return return
} }
if err := s.lockedUpdateLoadBalancerHosts(service.appliedState, hosts); err != nil { if err := s.lockedUpdateLoadBalancerHosts(service.appliedState, hosts); err != nil {
glog.Errorf("External error while updating TCP load balancer: %v.", err) glog.Errorf("External error while updating load balancer: %v.", err)
servicesToRetry = append(servicesToRetry, service) servicesToRetry = append(servicesToRetry, service)
} }
}() }()
@ -684,15 +689,15 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service,
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event // This operation doesn't normally take very long (and happens pretty often), so we only record the final event
name := cloudprovider.GetLoadBalancerName(service) name := cloudprovider.GetLoadBalancerName(service)
err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts) err := s.balancer.UpdateLoadBalancer(name, s.zone.Region, hosts)
if err == nil { if err == nil {
s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts") s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
return nil return nil
} }
// It's only an actual error if the load balancer still exists. // It's only an actual error if the load balancer still exists.
if _, exists, err := s.balancer.GetTCPLoadBalancer(name, s.zone.Region); err != nil { if _, exists, err := s.balancer.GetLoadBalancer(name, s.zone.Region); err != nil {
glog.Errorf("External error while checking if TCP load balancer %q exists: name, %v", name, err) glog.Errorf("External error while checking if load balancer %q exists: name, %v", name, err)
} else if !exists { } else if !exists {
return nil return nil
} }

View File

@ -65,8 +65,8 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
Type: api.ServiceTypeLoadBalancer, Type: api.ServiceTypeLoadBalancer,
}, },
}, },
expectErr: true, expectErr: false,
expectCreateAttempt: false, expectCreateAttempt: true,
}, },
{ {
service: &api.Service{ service: &api.Service{

View File

@ -162,10 +162,10 @@ var _ = Describe("Cluster Upgrade [Skipped]", func() {
}) })
f := NewFramework("cluster-upgrade") f := NewFramework("cluster-upgrade")
var w *WebserverTest var w *ServerTest
BeforeEach(func() { BeforeEach(func() {
By("Setting up the service, RC, and pods") By("Setting up the service, RC, and pods")
w = NewWebserverTest(f.Client, f.Namespace.Name, svcName) w = NewServerTest(f.Client, f.Namespace.Name, svcName)
rc := w.CreateWebserverRC(replicas) rc := w.CreateWebserverRC(replicas)
rcName = rc.ObjectMeta.Name rcName = rc.ObjectMeta.Name
svc := w.BuildServiceSpec() svc := w.BuildServiceSpec()
@ -200,6 +200,38 @@ var _ = Describe("Cluster Upgrade [Skipped]", func() {
BeforeEach(func() { BeforeEach(func() {
SkipUnlessProviderIs("gce") SkipUnlessProviderIs("gce")
}) })
// The version is determined once at the beginning of the test so that
// the master and nodes won't be skewed if the value changes during the
// test.
By(fmt.Sprintf("Getting real version for %q", testContext.UpgradeTarget))
var err error
v, err = realVersion(testContext.UpgradeTarget)
expectNoError(err)
Logf("Version for %q is %q", testContext.UpgradeTarget, v)
By("Setting up the service, RC, and pods")
f.beforeEach()
w = NewServerTest(f.Client, f.Namespace.Name, svcName)
rc := w.CreateWebserverRC(replicas)
rcName = rc.ObjectMeta.Name
svc := w.BuildServiceSpec()
svc.Spec.Type = api.ServiceTypeLoadBalancer
w.CreateService(svc)
By("Waiting for the service to become reachable")
result, err := waitForLoadBalancerIngress(f.Client, svcName, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
ingresses := result.Status.LoadBalancer.Ingress
if len(ingresses) != 1 {
Failf("Was expecting only 1 ingress IP but got %d (%v): %v", len(ingresses), ingresses, result)
}
ingress = ingresses[0]
Logf("Got load balancer ingress point %v", ingress)
ip = ingress.IP
if ip == "" {
ip = ingress.Hostname
}
testLoadBalancerReachable(ingress, 80)
It("of master should maintain responsive services", func() { It("of master should maintain responsive services", func() {
By("Validating cluster before master upgrade") By("Validating cluster before master upgrade")

View File

@ -177,7 +177,7 @@ func createApp(c *client.Client, ns string, i int) {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Logf("Creating rc %v", name) Logf("Creating rc %v", name)
rc := rcByNamePort(name, 1, testImage, httpContainerPort, l) rc := rcByNamePort(name, 1, testImage, httpContainerPort, api.ProtocolTCP, l)
rc.Spec.Template.Spec.Containers[0].Args = []string{ rc.Spec.Template.Spec.Containers[0].Args = []string{
"--num=1", "--num=1",
fmt.Sprintf("--start=%d", i), fmt.Sprintf("--start=%d", i),

View File

@ -173,11 +173,11 @@ func rcByName(name string, replicas int, image string, labels map[string]string)
}) })
} }
func rcByNamePort(name string, replicas int, image string, port int, labels map[string]string) *api.ReplicationController { func rcByNamePort(name string, replicas int, image string, port int, protocol api.Protocol, labels map[string]string) *api.ReplicationController {
return rcByNameContainer(name, replicas, image, labels, api.Container{ return rcByNameContainer(name, replicas, image, labels, api.Container{
Name: name, Name: name,
Image: image, Image: image,
Ports: []api.ContainerPort{{ContainerPort: port}}, Ports: []api.ContainerPort{{ContainerPort: port, Protocol: protocol}},
}) })
} }
@ -215,7 +215,7 @@ func rcByNameContainer(name string, replicas int, image string, labels map[strin
func newRCByName(c *client.Client, ns, name string, replicas int) (*api.ReplicationController, error) { func newRCByName(c *client.Client, ns, name string, replicas int) (*api.ReplicationController, error) {
By(fmt.Sprintf("creating replication controller %s", name)) By(fmt.Sprintf("creating replication controller %s", name))
return c.ReplicationControllers(ns).Create(rcByNamePort( return c.ReplicationControllers(ns).Create(rcByNamePort(
name, replicas, serveHostnameImage, 9376, map[string]string{})) name, replicas, serveHostnameImage, 9376, api.ProtocolTCP, map[string]string{}))
} }
func resizeRC(c *client.Client, ns, name string, replicas int) error { func resizeRC(c *client.Client, ns, name string, replicas int) error {

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"net"
"net/http" "net/http"
"sort" "sort"
"strconv" "strconv"
@ -354,7 +355,7 @@ var _ = Describe("Services", func() {
serviceName := "nodeportservice-test" serviceName := "nodeportservice-test"
ns := f.Namespace.Name ns := f.Namespace.Name
t := NewWebserverTest(c, ns, serviceName) t := NewServerTest(c, ns, serviceName)
defer func() { defer func() {
defer GinkgoRecover() defer GinkgoRecover()
errs := t.Cleanup() errs := t.Cleanup()
@ -410,7 +411,7 @@ var _ = Describe("Services", func() {
serviceName := "mutability-service-test" serviceName := "mutability-service-test"
t := NewWebserverTest(f.Client, f.Namespace.Name, serviceName) t := NewServerTest(f.Client, f.Namespace.Name, serviceName)
defer func() { defer func() {
defer GinkgoRecover() defer GinkgoRecover()
errs := t.Cleanup() errs := t.Cleanup()
@ -609,7 +610,7 @@ var _ = Describe("Services", func() {
serviceName2 := baseName + "2" serviceName2 := baseName + "2"
ns := f.Namespace.Name ns := f.Namespace.Name
t := NewWebserverTest(c, ns, serviceName1) t := NewServerTest(c, ns, serviceName1)
defer func() { defer func() {
defer GinkgoRecover() defer GinkgoRecover()
errs := t.Cleanup() errs := t.Cleanup()
@ -661,7 +662,7 @@ var _ = Describe("Services", func() {
serviceName := "nodeport-range-test" serviceName := "nodeport-range-test"
ns := f.Namespace.Name ns := f.Namespace.Name
t := NewWebserverTest(c, ns, serviceName) t := NewServerTest(c, ns, serviceName)
defer func() { defer func() {
defer GinkgoRecover() defer GinkgoRecover()
errs := t.Cleanup() errs := t.Cleanup()
@ -727,7 +728,7 @@ var _ = Describe("Services", func() {
serviceName := "nodeport-reuse" serviceName := "nodeport-reuse"
ns := f.Namespace.Name ns := f.Namespace.Name
t := NewWebserverTest(c, ns, serviceName) t := NewServerTest(c, ns, serviceName)
defer func() { defer func() {
defer GinkgoRecover() defer GinkgoRecover()
errs := t.Cleanup() errs := t.Cleanup()
@ -795,7 +796,7 @@ var _ = Describe("Services", func() {
servicePort := 9376 servicePort := 9376
By("creating service " + serviceName + " with load balancer in namespace " + ns1) By("creating service " + serviceName + " with load balancer in namespace " + ns1)
t1 := NewWebserverTest(c, ns1, serviceName) t1 := NewServerTest(c, ns1, serviceName)
svc1 := t1.BuildServiceSpec() svc1 := t1.BuildServiceSpec()
svc1.Spec.Type = api.ServiceTypeLoadBalancer svc1.Spec.Type = api.ServiceTypeLoadBalancer
svc1.Spec.Ports[0].Port = servicePort svc1.Spec.Ports[0].Port = servicePort
@ -819,18 +820,20 @@ var _ = Describe("Services", func() {
}() }()
} }
By("creating service " + serviceName + " with load balancer in namespace " + ns2) By("creating service " + serviceName + " with UDP load balancer in namespace " + ns2)
t2 := NewWebserverTest(c, ns2, serviceName) t2 := NewNetcatTest(c, ns2, serviceName)
svc2 := t2.BuildServiceSpec() svc2 := t2.BuildServiceSpec()
svc2.Spec.Type = api.ServiceTypeLoadBalancer svc2.Spec.Type = api.ServiceTypeLoadBalancer
svc2.Spec.Ports[0].Port = servicePort svc2.Spec.Ports[0].Port = servicePort
// Let this one be UDP so that we can test that as well without an additional test
svc2.Spec.Ports[0].Protocol = api.ProtocolUDP
svc2.Spec.Ports[0].TargetPort = intstr.FromInt(80) svc2.Spec.Ports[0].TargetPort = intstr.FromInt(80)
svc2.Spec.LoadBalancerIP = loadBalancerIP svc2.Spec.LoadBalancerIP = loadBalancerIP
_, err = t2.CreateService(svc2) _, err = t2.CreateService(svc2)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("creating pod to be part of service " + serviceName + " in namespace " + ns2) By("creating pod to be part of service " + serviceName + " in namespace " + ns2)
t2.CreateWebserverRC(2) t2.CreateNetcatRC(2)
ingressPoints := []string{} ingressPoints := []string{}
svcs := []*api.Service{svc1, svc2} svcs := []*api.Service{svc1, svc2}
@ -865,11 +868,19 @@ var _ = Describe("Services", func() {
} }
ingressPoints = append(ingressPoints, ing) // Save 'em to check uniqueness ingressPoints = append(ingressPoints, ing) // Save 'em to check uniqueness
By("hitting the pod through the service's NodePort") if svc1.Spec.Ports[0].Protocol == api.ProtocolTCP {
testReachable(pickNodeIP(c), port.NodePort) By("hitting the pod through the service's NodePort")
testReachable(pickNodeIP(c), port.NodePort)
By("hitting the pod through the service's external load balancer") By("hitting the pod through the service's external load balancer")
testLoadBalancerReachable(ingress, servicePort) testLoadBalancerReachable(ingress, servicePort)
} else {
By("hitting the pod through the service's NodePort")
testNetcatReachable(pickNodeIP(c), port.NodePort)
By("hitting the pod through the service's external load balancer")
testNetcatLoadBalancerReachable(ingress, servicePort)
}
} }
validateUniqueOrFail(ingressPoints) validateUniqueOrFail(ingressPoints)
}) })
@ -1160,6 +1171,15 @@ func testLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool {
return testLoadBalancerReachableInTime(ingress, port, podStartTimeout) return testLoadBalancerReachableInTime(ingress, port, podStartTimeout)
} }
func testNetcatLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) {
ip := ingress.IP
if ip == "" {
ip = ingress.Hostname
}
testNetcatReachable(ip, port)
}
func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool { func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool {
ip := ingress.IP ip := ingress.IP
if ip == "" { if ip == "" {
@ -1182,6 +1202,31 @@ func testReachable(ip string, port int) bool {
return testReachableInTime(ip, port, podStartTimeout) return testReachableInTime(ip, port, podStartTimeout)
} }
func testNetcatReachable(ip string, port int) {
con, err := net.Dial("udp", ip+":"+string(port))
if err != nil {
Failf("Failed to connect to: %s:%d (%s)", ip, port, err.Error())
}
_, err = con.Write([]byte("\n"))
if err != nil {
Failf("Failed to send newline: %s", err.Error())
}
var buf []byte = make([]byte, len("SUCCESS")+1)
_, err = con.Read(buf)
if err != nil {
Failf("Failed to read result: %s", err.Error())
}
if !strings.HasPrefix(string(buf), "SUCCESS") {
Failf("Failed to retrieve: \"SUCCESS\"")
}
Logf("Successfully retrieved \"SUCCESS\"")
}
func testReachableInTime(ip string, port int, timeout time.Duration) bool { func testReachableInTime(ip string, port int, timeout time.Duration) bool {
url := fmt.Sprintf("http://%s:%d", ip, port) url := fmt.Sprintf("http://%s:%d", ip, port)
if ip == "" { if ip == "" {
@ -1427,7 +1472,7 @@ func httpGetNoConnectionPool(url string) (*http.Response, error) {
} }
// Simple helper class to avoid too much boilerplate in tests // Simple helper class to avoid too much boilerplate in tests
type WebserverTest struct { type ServerTest struct {
ServiceName string ServiceName string
Namespace string Namespace string
Client *client.Client Client *client.Client
@ -1441,8 +1486,8 @@ type WebserverTest struct {
image string image string
} }
func NewWebserverTest(client *client.Client, namespace string, serviceName string) *WebserverTest { func NewServerTest(client *client.Client, namespace string, serviceName string) *ServerTest {
t := &WebserverTest{} t := &ServerTest{}
t.Client = client t.Client = client
t.Namespace = namespace t.Namespace = namespace
t.ServiceName = serviceName t.ServiceName = serviceName
@ -1460,8 +1505,27 @@ func NewWebserverTest(client *client.Client, namespace string, serviceName strin
return t return t
} }
func NewNetcatTest(client *client.Client, namespace string, serviceName string) *ServerTest {
t := &ServerTest{}
t.Client = client
t.Namespace = namespace
t.ServiceName = serviceName
t.TestId = t.ServiceName + "-" + string(util.NewUUID())
t.Labels = map[string]string{
"testid": t.TestId,
}
t.rcs = make(map[string]bool)
t.services = make(map[string]bool)
t.name = "netcat"
t.image = "ubuntu"
return t
}
// Build default config for a service (which can then be changed) // Build default config for a service (which can then be changed)
func (t *WebserverTest) BuildServiceSpec() *api.Service { func (t *ServerTest) BuildServiceSpec() *api.Service {
service := &api.Service{ service := &api.Service{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: t.ServiceName, Name: t.ServiceName,
@ -1480,8 +1544,24 @@ func (t *WebserverTest) BuildServiceSpec() *api.Service {
// CreateWebserverRC creates rc-backed pods with the well-known webserver // CreateWebserverRC creates rc-backed pods with the well-known webserver
// configuration and records it for cleanup. // configuration and records it for cleanup.
func (t *WebserverTest) CreateWebserverRC(replicas int) *api.ReplicationController { func (t *ServerTest) CreateWebserverRC(replicas int) *api.ReplicationController {
rcSpec := rcByNamePort(t.name, replicas, t.image, 80, t.Labels) rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolTCP, t.Labels)
rcAct, err := t.createRC(rcSpec)
if err != nil {
Failf("Failed to create rc %s: %v", rcSpec.Name, err)
}
if err := verifyPods(t.Client, t.Namespace, t.name, false, replicas); err != nil {
Failf("Failed to create %d pods with name %s: %v", replicas, t.name, err)
}
return rcAct
}
// CreateNetcatRC creates rc-backed pods with a netcat listener
// configuration and records it for cleanup.
func (t *ServerTest) CreateNetcatRC(replicas int) *api.ReplicationController {
rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolUDP, t.Labels)
rcSpec.Spec.Template.Spec.Containers[0].Command = []string{"/bin/bash"}
rcSpec.Spec.Template.Spec.Containers[0].Args = []string{"-c", "echo SUCCESS | nc -q 0 -u -l 0.0.0.0 80"}
rcAct, err := t.createRC(rcSpec) rcAct, err := t.createRC(rcSpec)
if err != nil { if err != nil {
Failf("Failed to create rc %s: %v", rcSpec.Name, err) Failf("Failed to create rc %s: %v", rcSpec.Name, err)
@ -1493,7 +1573,7 @@ func (t *WebserverTest) CreateWebserverRC(replicas int) *api.ReplicationControll
} }
// createRC creates a replication controller and records it for cleanup. // createRC creates a replication controller and records it for cleanup.
func (t *WebserverTest) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) { func (t *ServerTest) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) {
rc, err := t.Client.ReplicationControllers(t.Namespace).Create(rc) rc, err := t.Client.ReplicationControllers(t.Namespace).Create(rc)
if err == nil { if err == nil {
t.rcs[rc.Name] = true t.rcs[rc.Name] = true
@ -1502,7 +1582,7 @@ func (t *WebserverTest) createRC(rc *api.ReplicationController) (*api.Replicatio
} }
// Create a service, and record it for cleanup // Create a service, and record it for cleanup
func (t *WebserverTest) CreateService(service *api.Service) (*api.Service, error) { func (t *ServerTest) CreateService(service *api.Service) (*api.Service, error) {
result, err := t.Client.Services(t.Namespace).Create(service) result, err := t.Client.Services(t.Namespace).Create(service)
if err == nil { if err == nil {
t.services[service.Name] = true t.services[service.Name] = true
@ -1511,7 +1591,7 @@ func (t *WebserverTest) CreateService(service *api.Service) (*api.Service, error
} }
// Delete a service, and remove it from the cleanup list // Delete a service, and remove it from the cleanup list
func (t *WebserverTest) DeleteService(serviceName string) error { func (t *ServerTest) DeleteService(serviceName string) error {
err := t.Client.Services(t.Namespace).Delete(serviceName) err := t.Client.Services(t.Namespace).Delete(serviceName)
if err == nil { if err == nil {
delete(t.services, serviceName) delete(t.services, serviceName)
@ -1519,7 +1599,7 @@ func (t *WebserverTest) DeleteService(serviceName string) error {
return err return err
} }
func (t *WebserverTest) Cleanup() []error { func (t *ServerTest) Cleanup() []error {
var errs []error var errs []error
for rcName := range t.rcs { for rcName := range t.rcs {
By("stopping RC " + rcName + " in namespace " + t.Namespace) By("stopping RC " + rcName + " in namespace " + t.Namespace)