From d399a8f8cc5c60da3780dd657f1a977a0989b99a Mon Sep 17 00:00:00 2001 From: Kenneth Shelton Date: Mon, 28 Sep 2015 13:57:58 -0700 Subject: [PATCH 1/2] * Added UDP LB support (for GCE) --- pkg/api/validation/validation.go | 4 +- pkg/api/validation/validation_test.go | 8 +- pkg/cloudprovider/cloud.go | 24 ++-- pkg/cloudprovider/providers/aws/aws.go | 31 +++-- pkg/cloudprovider/providers/aws/aws_test.go | 16 +-- pkg/cloudprovider/providers/fake/doc.go | 2 +- pkg/cloudprovider/providers/fake/fake.go | 22 +-- pkg/cloudprovider/providers/gce/doc.go | 2 +- pkg/cloudprovider/providers/gce/gce.go | 56 +++++--- pkg/cloudprovider/providers/mesos/mesos.go | 4 +- .../providers/mesos/mesos_test.go | 6 +- .../providers/openstack/openstack.go | 31 +++-- .../providers/openstack/openstack_test.go | 12 +- pkg/cloudprovider/providers/ovirt/ovirt.go | 4 +- .../providers/rackspace/rackspace.go | 2 +- pkg/controller/service/servicecontroller.go | 41 +++--- .../service/servicecontroller_test.go | 4 +- test/e2e/cluster_upgrade.go | 36 ++++- test/e2e/ingress.go | 2 +- test/e2e/resize_nodes.go | 6 +- test/e2e/service.go | 126 ++++++++++++++---- 21 files changed, 293 insertions(+), 146 deletions(-) diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index a23fb4a8b70..e46ae862bcf 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -1457,8 +1457,8 @@ func ValidateService(service *api.Service) field.ErrorList { portsPath := specPath.Child("ports") for i := range service.Spec.Ports { portPath := portsPath.Index(i) - if service.Spec.Ports[i].Protocol != api.ProtocolTCP { - allErrs = append(allErrs, field.Invalid(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "may not use protocols other than 'TCP' when `type` is 'LoadBalancer'")) + if !supportedPortProtocols.Has(string(service.Spec.Ports[i].Protocol)) { + allErrs = append(allErrs, validation.NewInvalidError(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP ports")) } } } diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index fef048c0e17..d452bdefed6 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -2095,20 +2095,20 @@ func TestValidateService(t *testing.T) { numErrs: 1, }, { - name: "invalid load balancer protocol 1", + name: "valid load balancer protocol UDP 1", tweakSvc: func(s *api.Service) { s.Spec.Type = api.ServiceTypeLoadBalancer s.Spec.Ports[0].Protocol = "UDP" }, - numErrs: 1, + numErrs: 0, }, { - name: "invalid load balancer protocol 2", + name: "valid load balancer protocol UDP 2", tweakSvc: func(s *api.Service) { s.Spec.Type = api.ServiceTypeLoadBalancer s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "UDP", TargetPort: intstr.FromInt(12345)}) }, - numErrs: 1, + numErrs: 0, }, { name: "valid 1", diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index e64ed5dc0c2..fc4f875499b 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -27,8 +27,8 @@ import ( // Interface is an abstract, pluggable interface for cloud providers. type Interface interface { - // TCPLoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise. - TCPLoadBalancer() (TCPLoadBalancer, bool) + // LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise. + LoadBalancer() (LoadBalancer, bool) // Instances returns an instances interface. Also returns true if the interface is supported, false otherwise. Instances() (Instances, bool) // Zones returns a zones interface. Also returns true if the interface is supported, false otherwise. @@ -76,23 +76,23 @@ func GetInstanceProviderID(cloud Interface, nodeName string) (string, error) { return cloud.ProviderName() + "://" + instanceID, nil } -// TCPLoadBalancer is an abstract, pluggable interface for TCP load balancers. -type TCPLoadBalancer interface { +// LoadBalancer is an abstract, pluggable interface for load balancers. +type LoadBalancer interface { // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service - // GetTCPLoadBalancer returns whether the specified load balancer exists, and + // GetLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. - GetTCPLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) - // EnsureTCPLoadBalancer creates a new tcp load balancer, or updates an existing one. Returns the status of the balancer - EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) - // UpdateTCPLoadBalancer updates hosts under the specified load balancer. - UpdateTCPLoadBalancer(name, region string, hosts []string) error - // EnsureTCPLoadBalancerDeleted deletes the specified load balancer if it + GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) + // EnsureLoadBalancer creates a new load balancer, or updates an existing one. Returns the status of the balancer + EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) + // UpdateLoadBalancer updates hosts under the specified load balancer. + UpdateLoadBalancer(name, region string, hosts []string) error + // EnsureLoadBalancerDeleted deletes the specified load balancer if it // exists, returning nil if the load balancer specified either didn't exist or // was successfully deleted. // This construction is useful because many cloud providers' load balancers // have multiple underlying components, meaning a Get could say that the LB // doesn't exist even if some part of it is still laying around. - EnsureTCPLoadBalancerDeleted(name, region string) error + EnsureLoadBalancerDeleted(name, region string) error } // Instances is an abstract, pluggable interface for sets of instances. diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 4023b776759..73629927573 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -179,7 +179,7 @@ type InstanceGroupInfo interface { CurrentSize() (int, error) } -// AWSCloud is an implementation of Interface, TCPLoadBalancer and Instances for Amazon Web Services. +// AWSCloud is an implementation of Interface, LoadBalancer and Instances for Amazon Web Services. type AWSCloud struct { ec2 EC2 elb ELB @@ -619,8 +619,8 @@ func (aws *AWSCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut [] return nameservers, searches } -// TCPLoadBalancer returns an implementation of TCPLoadBalancer for Amazon Web Services. -func (s *AWSCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { +// LoadBalancer returns an implementation of LoadBalancer for Amazon Web Services. +func (s *AWSCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { return s, true } @@ -1695,8 +1695,8 @@ func (s *AWSCloud) listSubnetIDsinVPC(vpcId string) ([]string, error) { // EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway. -func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) +func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) if region != s.region { return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) @@ -1707,6 +1707,15 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) } + if len(ports) == 0 { + return nil, fmt.Errorf("requested load balancer with no ports") + } + + // The service controller verified all the protocols match on the ports, just check and use the first one + if ports[0].Protocol != api.ProtocolTCP { + return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB") + } + if publicIP != nil { return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB") } @@ -1812,8 +1821,8 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p return status, nil } -// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer -func (s *AWSCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { +// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer +func (s *AWSCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { if region != s.region { return nil, false, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) } @@ -1976,8 +1985,8 @@ func (s *AWSCloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalan return nil } -// EnsureTCPLoadBalancerDeleted implements TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. -func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { +// EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted. +func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error { if region != s.region { return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) } @@ -2070,8 +2079,8 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { return nil } -// UpdateTCPLoadBalancer implements TCPLoadBalancer.UpdateTCPLoadBalancer -func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { +// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer +func (s *AWSCloud) UpdateLoadBalancer(name, region string, hosts []string) error { if region != s.region { return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) } diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 91d72770d66..44d701d2347 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -693,24 +693,24 @@ func TestLoadBalancerMatchesClusterRegion(t *testing.T) { badELBRegion := "bad-elb-region" errorMessage := fmt.Sprintf("requested load balancer region '%s' does not match cluster region '%s'", badELBRegion, c.region) - _, _, err = c.GetTCPLoadBalancer("elb-name", badELBRegion) + _, _, err = c.GetLoadBalancer("elb-name", badELBRegion) if err == nil || err.Error() != errorMessage { - t.Errorf("Expected GetTCPLoadBalancer region mismatch error.") + t.Errorf("Expected GetLoadBalancer region mismatch error.") } - _, err = c.EnsureTCPLoadBalancer("elb-name", badELBRegion, nil, nil, nil, api.ServiceAffinityNone) + _, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, api.ServiceAffinityNone) if err == nil || err.Error() != errorMessage { - t.Errorf("Expected EnsureTCPLoadBalancer region mismatch error.") + t.Errorf("Expected EnsureLoadBalancer region mismatch error.") } - err = c.EnsureTCPLoadBalancerDeleted("elb-name", badELBRegion) + err = c.EnsureLoadBalancerDeleted("elb-name", badELBRegion) if err == nil || err.Error() != errorMessage { - t.Errorf("Expected EnsureTCPLoadBalancerDeleted region mismatch error.") + t.Errorf("Expected EnsureLoadBalancerDeleted region mismatch error.") } - err = c.UpdateTCPLoadBalancer("elb-name", badELBRegion, nil) + err = c.UpdateLoadBalancer("elb-name", badELBRegion, nil) if err == nil || err.Error() != errorMessage { - t.Errorf("Expected UpdateTCPLoadBalancer region mismatch error.") + t.Errorf("Expected UpdateLoadBalancer region mismatch error.") } } diff --git a/pkg/cloudprovider/providers/fake/doc.go b/pkg/cloudprovider/providers/fake/doc.go index fb73be6e6cd..ff22d568f9f 100644 --- a/pkg/cloudprovider/providers/fake/doc.go +++ b/pkg/cloudprovider/providers/fake/doc.go @@ -15,5 +15,5 @@ limitations under the License. */ // Package fake is a test-double implementation of cloudprovider -// Interface, TCPLoadBalancer and Instances. It is useful for testing. +// Interface, LoadBalancer and Instances. It is useful for testing. package fake diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index a11030d2adf..5a67612858c 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -44,7 +44,7 @@ type FakeUpdateBalancerCall struct { Hosts []string } -// FakeCloud is a test-double implementation of Interface, TCPLoadBalancer, Instances, and Routes. It is useful for testing. +// FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing. type FakeCloud struct { Exists bool Err error @@ -99,9 +99,9 @@ func (f *FakeCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []s return nameservers, searches } -// TCPLoadBalancer returns a fake implementation of TCPLoadBalancer. +// LoadBalancer returns a fake implementation of LoadBalancer. // Actually it just returns f itself. -func (f *FakeCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { +func (f *FakeCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { return f, true } @@ -120,17 +120,17 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) { return f, true } -// GetTCPLoadBalancer is a stub implementation of TCPLoadBalancer.GetTCPLoadBalancer. -func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { +// GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer. +func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} return status, f.Exists, f.Err } -// EnsureTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. +// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // It adds an entry "create" into the internal method call record. -func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { +func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { f.addCall("create") if f.Balancers == nil { f.Balancers = make(map[string]FakeBalancer) @@ -143,17 +143,17 @@ func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP return status, f.Err } -// UpdateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.UpdateTCPLoadBalancer. +// UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // It adds an entry "update" into the internal method call record. -func (f *FakeCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { +func (f *FakeCloud) UpdateLoadBalancer(name, region string, hosts []string) error { f.addCall("update") f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts}) return f.Err } -// EnsureTCPLoadBalancerDeleted is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. +// EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted. // It adds an entry "delete" into the internal method call record. -func (f *FakeCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { +func (f *FakeCloud) EnsureLoadBalancerDeleted(name, region string) error { f.addCall("delete") return f.Err } diff --git a/pkg/cloudprovider/providers/gce/doc.go b/pkg/cloudprovider/providers/gce/doc.go index 1136fa64009..93acc5a3142 100644 --- a/pkg/cloudprovider/providers/gce/doc.go +++ b/pkg/cloudprovider/providers/gce/doc.go @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package gce is an implementation of Interface, TCPLoadBalancer +// Package gce is an implementation of Interface, LoadBalancer // and Instances for Google Compute Engine. package gce diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index d3f26acdeb9..8f624f7bbd1 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -60,7 +60,7 @@ const ( operationPollTimeoutDuration = 30 * time.Minute ) -// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine. +// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. type GCECloud struct { service *compute.Service containerService *container.Service @@ -253,8 +253,8 @@ func (gce *GCECloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut [] return nameservers, srchOut } -// TCPLoadBalancer returns an implementation of TCPLoadBalancer for Google Compute Engine. -func (gce *GCECloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { +// LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine. +func (gce *GCECloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { return gce, true } @@ -350,8 +350,8 @@ func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error { }) } -// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer -func (gce *GCECloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { +// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer +func (gce *GCECloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() if err == nil { status := &api.LoadBalancerStatus{} @@ -370,16 +370,18 @@ func isHTTPErrorCode(err error, code int) bool { return ok && apiErr.Code == code } -// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. +// EnsureLoadBalancer is an implementation of LoadBalancer.EnsureLoadBalancer. // Our load balancers in GCE consist of four separate GCE resources - a static // IP address, a firewall rule, a target pool, and a forwarding rule. This // function has to manage all of them. // Due to an interesting series of design decisions, this handles both creating // new load balancers and updating existing load balancers, recognizing when // each is needed. -func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { +func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, ports, hosts) + if len(hosts) == 0 { - return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts") + return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts") } // Check if the forwarding rule exists, and if so, what its IP is. @@ -426,7 +428,7 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net. // Deal with the firewall next. The reason we do this here rather than last // is because the forwarding rule is used as the indicator that the load - // balancer is fully created - it's what getTCPLoadBalancer checks for. + // balancer is fully created - it's what getLoadBalancer checks for. firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports) if err != nil { return nil, err @@ -515,6 +517,11 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP if portRange != fwd.PortRange { return true, true, fwd.IPAddress, nil } + // The service controller verified all the protocols match on the ports, just check the first one + if string(ports[0].Protocol) != fwd.IPProtocol { + return true, true, fwd.IPAddress, nil + } + return true, false, fwd.IPAddress, nil } @@ -522,6 +529,12 @@ func loadBalancerPortRange(ports []*api.ServicePort) (string, error) { if len(ports) == 0 { return "", fmt.Errorf("no ports specified for GCE load balancer") } + + // The service controller verified all the protocols match on the ports, just check and use the first one + if ports[0].Protocol != api.ProtocolTCP && ports[0].Protocol != api.ProtocolUDP { + return "", fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(ports[0].Protocol)) + } + minPort := 65536 maxPort := 0 for i := range ports { @@ -575,7 +588,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [ if fw.Description != makeFirewallDescription(ipAddress) { return true, true, nil } - if len(fw.Allowed) != 1 || fw.Allowed[0].IPProtocol != "tcp" { + if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") { return true, true, nil } // Make sure the allowed ports match. @@ -586,6 +599,8 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [ if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) { return true, true, nil } + // The service controller already verified that the protocol matches on all ports, no need to check. + return true, false, nil } @@ -617,11 +632,10 @@ func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports return err } req := &compute.ForwardingRule{ - Name: name, - IPAddress: ipAddress, - IPProtocol: "TCP", - PortRange: portRange, - Target: gce.targetPoolURL(name, region), + Name: name, + IPAddress: ipAddress, IPProtocol: string(ports[0].Protocol), + PortRange: portRange, + Target: gce.targetPoolURL(name, region), } op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() @@ -713,7 +727,7 @@ func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api TargetTags: hostTags, Allowed: []*compute.FirewallAllowed{ { - IPProtocol: "tcp", + IPProtocol: strings.ToLower(string(ports[0].Protocol)), Ports: allowedPorts, }, }, @@ -803,8 +817,8 @@ func (gce *GCECloud) createOrPromoteStaticIP(name, region, existingIP string) (i return address.Address, nil } -// UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer. -func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { +// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer. +func (gce *GCECloud) UpdateLoadBalancer(name, region string, hosts []string) error { pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() if err != nil { return err @@ -864,12 +878,12 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) return nil } -// EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. -func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error { +// EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted. +func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error { err := utilerrors.AggregateGoroutines( func() error { return gce.deleteFirewall(name, region) }, // Even though we don't hold on to static IPs for load balancers, it's - // possible that EnsureTCPLoadBalancer left one around in a failed + // possible that EnsureLoadBalancer left one around in a failed // creation/update attempt, so make sure we clean it up here just in case. func() error { return gce.deleteStaticIP(name, region) }, func() error { diff --git a/pkg/cloudprovider/providers/mesos/mesos.go b/pkg/cloudprovider/providers/mesos/mesos.go index 538c3de5a10..606ffad0e7f 100644 --- a/pkg/cloudprovider/providers/mesos/mesos.go +++ b/pkg/cloudprovider/providers/mesos/mesos.go @@ -102,10 +102,10 @@ func (c *MesosCloud) Instances() (cloudprovider.Instances, bool) { return c, true } -// TCPLoadBalancer always returns nil, false in this implementation. +// LoadBalancer always returns nil, false in this implementation. // Mesos does not provide any type of native load balancing by default, // so this implementation always returns (nil, false). -func (c *MesosCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { +func (c *MesosCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { return nil, false } diff --git a/pkg/cloudprovider/providers/mesos/mesos_test.go b/pkg/cloudprovider/providers/mesos/mesos_test.go index b7f909bf1e4..b504f4ef038 100644 --- a/pkg/cloudprovider/providers/mesos/mesos_test.go +++ b/pkg/cloudprovider/providers/mesos/mesos_test.go @@ -121,15 +121,15 @@ func Test_Instances(t *testing.T) { } } -// test mesos.TCPLoadBalancer +// test mesos.LoadBalancer func Test_TcpLoadBalancer(t *testing.T) { defer log.Flush() mesosCloud, _ := newMesosCloud(nil) - lb, supports_lb := mesosCloud.TCPLoadBalancer() + lb, supports_lb := mesosCloud.LoadBalancer() if supports_lb || lb != nil { - t.Fatalf("MesosCloud does not provide an implementation of TCPLoadBalancer") + t.Fatalf("MesosCloud does not provide an implementation of LoadBalancer") } } diff --git a/pkg/cloudprovider/providers/openstack/openstack.go b/pkg/cloudprovider/providers/openstack/openstack.go index bfc34cd5410..9cc3b96bf64 100644 --- a/pkg/cloudprovider/providers/openstack/openstack.go +++ b/pkg/cloudprovider/providers/openstack/openstack.go @@ -495,8 +495,8 @@ type LoadBalancer struct { opts LoadBalancerOpts } -func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { - glog.V(4).Info("openstack.TCPLoadBalancer() called") +func (os *OpenStack) LoadBalancer() (cloudprovider.LoadBalancer, bool) { + glog.V(4).Info("openstack.LoadBalancer() called") // TODO: Search for and support Rackspace loadbalancer API, and others. network, err := openstack.NewNetworkV2(os.provider, gophercloud.EndpointOpts{ @@ -515,7 +515,7 @@ func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { return nil, false } - glog.V(1).Info("Claiming to support TCPLoadBalancer") + glog.V(1).Info("Claiming to support LoadBalancer") return &LoadBalancer{network, compute, os.lbOpts}, true } @@ -630,7 +630,7 @@ func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*f return &floatingIPList[0], nil } -func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { +func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { vip, err := getVipByName(lb.network, name) if err == ErrNotFound { return nil, false, nil @@ -650,11 +650,18 @@ func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalanc // a list of regions (from config) and query/create loadbalancers in // each region. -func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - glog.V(4).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, affinity) +func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, affinity) if len(ports) > 1 { return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") + } else if len(ports) == 0 { + return nil, fmt.Errorf("no ports provided to openstack load balancer") + } + + // The service controller verified all the protocols match on the ports, just check and use the first one + if ports[0].Protocol != api.ProtocolTCP { + return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") } var persistence *vips.SessionPersistence @@ -668,7 +675,7 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI } glog.V(2).Info("Checking if openstack load balancer already exists: %s", name) - _, exists, err := lb.GetTCPLoadBalancer(name, region) + _, exists, err := lb.GetLoadBalancer(name, region) if err != nil { return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) } @@ -676,7 +683,7 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI // TODO: Implement a more efficient update strategy for common changes than delete & create // In particular, if we implement hosts update, we can get rid of UpdateHosts if exists { - err := lb.EnsureTCPLoadBalancerDeleted(name, region) + err := lb.EnsureLoadBalancerDeleted(name, region) if err != nil { return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) } @@ -777,8 +784,8 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI } -func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []string) error { - glog.V(4).Infof("UpdateTCPLoadBalancer(%v, %v, %v)", name, region, hosts) +func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string) error { + glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", name, region, hosts) vip, err := getVipByName(lb.network, name) if err != nil { @@ -838,8 +845,8 @@ func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []strin return nil } -func (lb *LoadBalancer) EnsureTCPLoadBalancerDeleted(name, region string) error { - glog.V(4).Infof("EnsureTCPLoadBalancerDeleted(%v, %v)", name, region) +func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error { + glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", name, region) vip, err := getVipByName(lb.network, name) if err != nil && err != ErrNotFound { diff --git a/pkg/cloudprovider/providers/openstack/openstack_test.go b/pkg/cloudprovider/providers/openstack/openstack_test.go index be70f496d02..7f575ed8c11 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_test.go +++ b/pkg/cloudprovider/providers/openstack/openstack_test.go @@ -151,7 +151,7 @@ func TestInstances(t *testing.T) { t.Logf("Found NodeAddresses(%s) = %s\n", srvs[0], addrs) } -func TestTCPLoadBalancer(t *testing.T) { +func TestLoadBalancer(t *testing.T) { cfg, ok := configFromEnv() if !ok { t.Skipf("No config found in environment") @@ -162,17 +162,17 @@ func TestTCPLoadBalancer(t *testing.T) { t.Fatalf("Failed to construct/authenticate OpenStack: %s", err) } - lb, ok := os.TCPLoadBalancer() + lb, ok := os.LoadBalancer() if !ok { - t.Fatalf("TCPLoadBalancer() returned false - perhaps your stack doesn't support Neutron?") + t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?") } - _, exists, err := lb.GetTCPLoadBalancer("noexist", "region") + _, exists, err := lb.GetLoadBalancer("noexist", "region") if err != nil { - t.Fatalf("GetTCPLoadBalancer(\"noexist\") returned error: %s", err) + t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err) } if exists { - t.Fatalf("GetTCPLoadBalancer(\"noexist\") returned exists") + t.Fatalf("GetLoadBalancer(\"noexist\") returned exists") } } diff --git a/pkg/cloudprovider/providers/ovirt/ovirt.go b/pkg/cloudprovider/providers/ovirt/ovirt.go index a448a639743..ba30633d767 100644 --- a/pkg/cloudprovider/providers/ovirt/ovirt.go +++ b/pkg/cloudprovider/providers/ovirt/ovirt.go @@ -128,8 +128,8 @@ func (v *OVirtCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut [] return nameservers, searches } -// TCPLoadBalancer returns an implementation of TCPLoadBalancer for oVirt cloud -func (v *OVirtCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { +// LoadBalancer returns an implementation of LoadBalancer for oVirt cloud +func (v *OVirtCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { return nil, false } diff --git a/pkg/cloudprovider/providers/rackspace/rackspace.go b/pkg/cloudprovider/providers/rackspace/rackspace.go index 9de46ed9e1b..6cb95dbdb14 100644 --- a/pkg/cloudprovider/providers/rackspace/rackspace.go +++ b/pkg/cloudprovider/providers/rackspace/rackspace.go @@ -367,7 +367,7 @@ func (os *Rackspace) ScrubDNS(nameservers, searches []string) (nsOut, srchOut [] return nameservers, searches } -func (os *Rackspace) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { +func (os *Rackspace) LoadBalancer() (cloudprovider.LoadBalancer, bool) { return nil, false } diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index be331bd3f44..d6f8ceb5490 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -69,7 +69,7 @@ type ServiceController struct { cloud cloudprovider.Interface kubeClient client.Interface clusterName string - balancer cloudprovider.TCPLoadBalancer + balancer cloudprovider.LoadBalancer zone cloudprovider.Zone cache *serviceCache eventBroadcaster record.EventBroadcaster @@ -150,9 +150,9 @@ func (s *ServiceController) init() error { return fmt.Errorf("ServiceController should not be run without a cloudprovider.") } - balancer, ok := s.cloud.TCPLoadBalancer() + balancer, ok := s.cloud.LoadBalancer() if !ok { - return fmt.Errorf("the cloud provider does not support TCP load balancers.") + return fmt.Errorf("the cloud provider does not support external load balancers.") } s.balancer = balancer @@ -256,7 +256,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { s.cache.set(namespacedName.String(), cachedService) case cache.Deleted: s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") - err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region) + err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region) if err != nil { message := "Error deleting load balancer (will retry): " + err.Error() s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message) @@ -278,7 +278,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name return nil, notRetryable } - // Note: It is safe to just call EnsureTCPLoadBalancer. But, on some clouds that requires a delete & create, + // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create, // which may involve service interruption. Also, we would like user-friendly events. // Save the state so we can avoid a write if it doesn't change @@ -293,8 +293,8 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name } else { // If we don't have any cached memory of the load balancer, we have to ask // the cloud provider for what it knows about it. - // Technically EnsureTCPLoadBalancerDeleted can cope, but we want to post meaningful events - _, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region) + // Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events + _, exists, err := s.balancer.GetLoadBalancer(s.loadBalancerName(service), s.zone.Region) if err != nil { return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable } @@ -306,7 +306,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name if needDelete { glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName) s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") - if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { + if err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { return err, retryable } s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") @@ -378,7 +378,9 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error { return err } name := s.loadBalancerName(service) - status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP), + // getPortsForLB already verified that the protocol matches for all ports. + // The cloud provider will verify the protocol is supported + status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP), ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) if err != nil { return err @@ -482,15 +484,18 @@ func (s *ServiceController) loadBalancerName(service *api.Service) string { } func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) { + var protocol api.Protocol + ports := []*api.ServicePort{} for i := range service.Spec.Ports { - // TODO: Support UDP. Remove the check from the API validation package once - // it's supported. sp := &service.Spec.Ports[i] - if sp.Protocol != api.ProtocolTCP { - return nil, fmt.Errorf("load balancers for non TCP services are not currently supported.") - } + // The check on protocol was removed here. The cloud provider itself is now responsible for all protocol validation ports = append(ports, sp) + if protocol == "" { + protocol = sp.Protocol + } else if protocol != sp.Protocol && wantsLoadBalancer(service) { + return nil, fmt.Errorf("mixed protocol external load balancers are not supported.") + } } return ports, nil } @@ -667,7 +672,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, h return } if err := s.lockedUpdateLoadBalancerHosts(service.appliedState, hosts); err != nil { - glog.Errorf("External error while updating TCP load balancer: %v.", err) + glog.Errorf("External error while updating load balancer: %v.", err) servicesToRetry = append(servicesToRetry, service) } }() @@ -684,15 +689,15 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, // This operation doesn't normally take very long (and happens pretty often), so we only record the final event name := cloudprovider.GetLoadBalancerName(service) - err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts) + err := s.balancer.UpdateLoadBalancer(name, s.zone.Region, hosts) if err == nil { s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts") return nil } // It's only an actual error if the load balancer still exists. - if _, exists, err := s.balancer.GetTCPLoadBalancer(name, s.zone.Region); err != nil { - glog.Errorf("External error while checking if TCP load balancer %q exists: name, %v", name, err) + if _, exists, err := s.balancer.GetLoadBalancer(name, s.zone.Region); err != nil { + glog.Errorf("External error while checking if load balancer %q exists: name, %v", name, err) } else if !exists { return nil } diff --git a/pkg/controller/service/servicecontroller_test.go b/pkg/controller/service/servicecontroller_test.go index 54e7a119073..72ae9eb3dc4 100644 --- a/pkg/controller/service/servicecontroller_test.go +++ b/pkg/controller/service/servicecontroller_test.go @@ -65,8 +65,8 @@ func TestCreateExternalLoadBalancer(t *testing.T) { Type: api.ServiceTypeLoadBalancer, }, }, - expectErr: true, - expectCreateAttempt: false, + expectErr: false, + expectCreateAttempt: true, }, { service: &api.Service{ diff --git a/test/e2e/cluster_upgrade.go b/test/e2e/cluster_upgrade.go index 38d120849f8..870eb29b0ef 100644 --- a/test/e2e/cluster_upgrade.go +++ b/test/e2e/cluster_upgrade.go @@ -162,10 +162,10 @@ var _ = Describe("Cluster Upgrade [Skipped]", func() { }) f := NewFramework("cluster-upgrade") - var w *WebserverTest + var w *ServerTest BeforeEach(func() { By("Setting up the service, RC, and pods") - w = NewWebserverTest(f.Client, f.Namespace.Name, svcName) + w = NewServerTest(f.Client, f.Namespace.Name, svcName) rc := w.CreateWebserverRC(replicas) rcName = rc.ObjectMeta.Name svc := w.BuildServiceSpec() @@ -200,6 +200,38 @@ var _ = Describe("Cluster Upgrade [Skipped]", func() { BeforeEach(func() { SkipUnlessProviderIs("gce") }) + // The version is determined once at the beginning of the test so that + // the master and nodes won't be skewed if the value changes during the + // test. + By(fmt.Sprintf("Getting real version for %q", testContext.UpgradeTarget)) + var err error + v, err = realVersion(testContext.UpgradeTarget) + expectNoError(err) + Logf("Version for %q is %q", testContext.UpgradeTarget, v) + + By("Setting up the service, RC, and pods") + f.beforeEach() + w = NewServerTest(f.Client, f.Namespace.Name, svcName) + rc := w.CreateWebserverRC(replicas) + rcName = rc.ObjectMeta.Name + svc := w.BuildServiceSpec() + svc.Spec.Type = api.ServiceTypeLoadBalancer + w.CreateService(svc) + + By("Waiting for the service to become reachable") + result, err := waitForLoadBalancerIngress(f.Client, svcName, f.Namespace.Name) + Expect(err).NotTo(HaveOccurred()) + ingresses := result.Status.LoadBalancer.Ingress + if len(ingresses) != 1 { + Failf("Was expecting only 1 ingress IP but got %d (%v): %v", len(ingresses), ingresses, result) + } + ingress = ingresses[0] + Logf("Got load balancer ingress point %v", ingress) + ip = ingress.IP + if ip == "" { + ip = ingress.Hostname + } + testLoadBalancerReachable(ingress, 80) It("of master should maintain responsive services", func() { By("Validating cluster before master upgrade") diff --git a/test/e2e/ingress.go b/test/e2e/ingress.go index d9f0037823e..5bd9d010b2f 100644 --- a/test/e2e/ingress.go +++ b/test/e2e/ingress.go @@ -177,7 +177,7 @@ func createApp(c *client.Client, ns string, i int) { Expect(err).NotTo(HaveOccurred()) Logf("Creating rc %v", name) - rc := rcByNamePort(name, 1, testImage, httpContainerPort, l) + rc := rcByNamePort(name, 1, testImage, httpContainerPort, api.ProtocolTCP, l) rc.Spec.Template.Spec.Containers[0].Args = []string{ "--num=1", fmt.Sprintf("--start=%d", i), diff --git a/test/e2e/resize_nodes.go b/test/e2e/resize_nodes.go index 5d0d3b07034..d40d789d83d 100644 --- a/test/e2e/resize_nodes.go +++ b/test/e2e/resize_nodes.go @@ -173,11 +173,11 @@ func rcByName(name string, replicas int, image string, labels map[string]string) }) } -func rcByNamePort(name string, replicas int, image string, port int, labels map[string]string) *api.ReplicationController { +func rcByNamePort(name string, replicas int, image string, port int, protocol api.Protocol, labels map[string]string) *api.ReplicationController { return rcByNameContainer(name, replicas, image, labels, api.Container{ Name: name, Image: image, - Ports: []api.ContainerPort{{ContainerPort: port}}, + Ports: []api.ContainerPort{{ContainerPort: port, Protocol: protocol}}, }) } @@ -215,7 +215,7 @@ func rcByNameContainer(name string, replicas int, image string, labels map[strin func newRCByName(c *client.Client, ns, name string, replicas int) (*api.ReplicationController, error) { By(fmt.Sprintf("creating replication controller %s", name)) return c.ReplicationControllers(ns).Create(rcByNamePort( - name, replicas, serveHostnameImage, 9376, map[string]string{})) + name, replicas, serveHostnameImage, 9376, api.ProtocolTCP, map[string]string{})) } func resizeRC(c *client.Client, ns, name string, replicas int) error { diff --git a/test/e2e/service.go b/test/e2e/service.go index 83d61e529b5..ab330eba3ec 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" "math/rand" + "net" "net/http" "sort" "strconv" @@ -354,7 +355,7 @@ var _ = Describe("Services", func() { serviceName := "nodeportservice-test" ns := f.Namespace.Name - t := NewWebserverTest(c, ns, serviceName) + t := NewServerTest(c, ns, serviceName) defer func() { defer GinkgoRecover() errs := t.Cleanup() @@ -410,7 +411,7 @@ var _ = Describe("Services", func() { serviceName := "mutability-service-test" - t := NewWebserverTest(f.Client, f.Namespace.Name, serviceName) + t := NewServerTest(f.Client, f.Namespace.Name, serviceName) defer func() { defer GinkgoRecover() errs := t.Cleanup() @@ -609,7 +610,7 @@ var _ = Describe("Services", func() { serviceName2 := baseName + "2" ns := f.Namespace.Name - t := NewWebserverTest(c, ns, serviceName1) + t := NewServerTest(c, ns, serviceName1) defer func() { defer GinkgoRecover() errs := t.Cleanup() @@ -661,7 +662,7 @@ var _ = Describe("Services", func() { serviceName := "nodeport-range-test" ns := f.Namespace.Name - t := NewWebserverTest(c, ns, serviceName) + t := NewServerTest(c, ns, serviceName) defer func() { defer GinkgoRecover() errs := t.Cleanup() @@ -727,7 +728,7 @@ var _ = Describe("Services", func() { serviceName := "nodeport-reuse" ns := f.Namespace.Name - t := NewWebserverTest(c, ns, serviceName) + t := NewServerTest(c, ns, serviceName) defer func() { defer GinkgoRecover() errs := t.Cleanup() @@ -795,7 +796,7 @@ var _ = Describe("Services", func() { servicePort := 9376 By("creating service " + serviceName + " with load balancer in namespace " + ns1) - t1 := NewWebserverTest(c, ns1, serviceName) + t1 := NewServerTest(c, ns1, serviceName) svc1 := t1.BuildServiceSpec() svc1.Spec.Type = api.ServiceTypeLoadBalancer svc1.Spec.Ports[0].Port = servicePort @@ -819,18 +820,20 @@ var _ = Describe("Services", func() { }() } - By("creating service " + serviceName + " with load balancer in namespace " + ns2) - t2 := NewWebserverTest(c, ns2, serviceName) + By("creating service " + serviceName + " with UDP load balancer in namespace " + ns2) + t2 := NewNetcatTest(c, ns2, serviceName) svc2 := t2.BuildServiceSpec() svc2.Spec.Type = api.ServiceTypeLoadBalancer svc2.Spec.Ports[0].Port = servicePort + // Let this one be UDP so that we can test that as well without an additional test + svc2.Spec.Ports[0].Protocol = api.ProtocolUDP svc2.Spec.Ports[0].TargetPort = intstr.FromInt(80) svc2.Spec.LoadBalancerIP = loadBalancerIP _, err = t2.CreateService(svc2) Expect(err).NotTo(HaveOccurred()) By("creating pod to be part of service " + serviceName + " in namespace " + ns2) - t2.CreateWebserverRC(2) + t2.CreateNetcatRC(2) ingressPoints := []string{} svcs := []*api.Service{svc1, svc2} @@ -865,11 +868,19 @@ var _ = Describe("Services", func() { } ingressPoints = append(ingressPoints, ing) // Save 'em to check uniqueness - By("hitting the pod through the service's NodePort") - testReachable(pickNodeIP(c), port.NodePort) + if svc1.Spec.Ports[0].Protocol == api.ProtocolTCP { + By("hitting the pod through the service's NodePort") + testReachable(pickNodeIP(c), port.NodePort) - By("hitting the pod through the service's external load balancer") - testLoadBalancerReachable(ingress, servicePort) + By("hitting the pod through the service's external load balancer") + testLoadBalancerReachable(ingress, servicePort) + } else { + By("hitting the pod through the service's NodePort") + testNetcatReachable(pickNodeIP(c), port.NodePort) + + By("hitting the pod through the service's external load balancer") + testNetcatLoadBalancerReachable(ingress, servicePort) + } } validateUniqueOrFail(ingressPoints) }) @@ -1160,6 +1171,15 @@ func testLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool { return testLoadBalancerReachableInTime(ingress, port, podStartTimeout) } +func testNetcatLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) { + ip := ingress.IP + if ip == "" { + ip = ingress.Hostname + } + + testNetcatReachable(ip, port) +} + func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool { ip := ingress.IP if ip == "" { @@ -1182,6 +1202,31 @@ func testReachable(ip string, port int) bool { return testReachableInTime(ip, port, podStartTimeout) } +func testNetcatReachable(ip string, port int) { + con, err := net.Dial("udp", ip+":"+string(port)) + if err != nil { + Failf("Failed to connect to: %s:%d (%s)", ip, port, err.Error()) + } + + _, err = con.Write([]byte("\n")) + if err != nil { + Failf("Failed to send newline: %s", err.Error()) + } + + var buf []byte = make([]byte, len("SUCCESS")+1) + + _, err = con.Read(buf) + if err != nil { + Failf("Failed to read result: %s", err.Error()) + } + + if !strings.HasPrefix(string(buf), "SUCCESS") { + Failf("Failed to retrieve: \"SUCCESS\"") + } + + Logf("Successfully retrieved \"SUCCESS\"") +} + func testReachableInTime(ip string, port int, timeout time.Duration) bool { url := fmt.Sprintf("http://%s:%d", ip, port) if ip == "" { @@ -1427,7 +1472,7 @@ func httpGetNoConnectionPool(url string) (*http.Response, error) { } // Simple helper class to avoid too much boilerplate in tests -type WebserverTest struct { +type ServerTest struct { ServiceName string Namespace string Client *client.Client @@ -1441,8 +1486,8 @@ type WebserverTest struct { image string } -func NewWebserverTest(client *client.Client, namespace string, serviceName string) *WebserverTest { - t := &WebserverTest{} +func NewServerTest(client *client.Client, namespace string, serviceName string) *ServerTest { + t := &ServerTest{} t.Client = client t.Namespace = namespace t.ServiceName = serviceName @@ -1460,8 +1505,27 @@ func NewWebserverTest(client *client.Client, namespace string, serviceName strin return t } +func NewNetcatTest(client *client.Client, namespace string, serviceName string) *ServerTest { + t := &ServerTest{} + t.Client = client + t.Namespace = namespace + t.ServiceName = serviceName + t.TestId = t.ServiceName + "-" + string(util.NewUUID()) + t.Labels = map[string]string{ + "testid": t.TestId, + } + + t.rcs = make(map[string]bool) + t.services = make(map[string]bool) + + t.name = "netcat" + t.image = "ubuntu" + + return t +} + // Build default config for a service (which can then be changed) -func (t *WebserverTest) BuildServiceSpec() *api.Service { +func (t *ServerTest) BuildServiceSpec() *api.Service { service := &api.Service{ ObjectMeta: api.ObjectMeta{ Name: t.ServiceName, @@ -1480,8 +1544,24 @@ func (t *WebserverTest) BuildServiceSpec() *api.Service { // CreateWebserverRC creates rc-backed pods with the well-known webserver // configuration and records it for cleanup. -func (t *WebserverTest) CreateWebserverRC(replicas int) *api.ReplicationController { - rcSpec := rcByNamePort(t.name, replicas, t.image, 80, t.Labels) +func (t *ServerTest) CreateWebserverRC(replicas int) *api.ReplicationController { + rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolTCP, t.Labels) + rcAct, err := t.createRC(rcSpec) + if err != nil { + Failf("Failed to create rc %s: %v", rcSpec.Name, err) + } + if err := verifyPods(t.Client, t.Namespace, t.name, false, replicas); err != nil { + Failf("Failed to create %d pods with name %s: %v", replicas, t.name, err) + } + return rcAct +} + +// CreateNetcatRC creates rc-backed pods with a netcat listener +// configuration and records it for cleanup. +func (t *ServerTest) CreateNetcatRC(replicas int) *api.ReplicationController { + rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolUDP, t.Labels) + rcSpec.Spec.Template.Spec.Containers[0].Command = []string{"/bin/bash"} + rcSpec.Spec.Template.Spec.Containers[0].Args = []string{"-c", "echo SUCCESS | nc -q 0 -u -l 0.0.0.0 80"} rcAct, err := t.createRC(rcSpec) if err != nil { Failf("Failed to create rc %s: %v", rcSpec.Name, err) @@ -1493,7 +1573,7 @@ func (t *WebserverTest) CreateWebserverRC(replicas int) *api.ReplicationControll } // createRC creates a replication controller and records it for cleanup. -func (t *WebserverTest) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) { +func (t *ServerTest) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) { rc, err := t.Client.ReplicationControllers(t.Namespace).Create(rc) if err == nil { t.rcs[rc.Name] = true @@ -1502,7 +1582,7 @@ func (t *WebserverTest) createRC(rc *api.ReplicationController) (*api.Replicatio } // Create a service, and record it for cleanup -func (t *WebserverTest) CreateService(service *api.Service) (*api.Service, error) { +func (t *ServerTest) CreateService(service *api.Service) (*api.Service, error) { result, err := t.Client.Services(t.Namespace).Create(service) if err == nil { t.services[service.Name] = true @@ -1511,7 +1591,7 @@ func (t *WebserverTest) CreateService(service *api.Service) (*api.Service, error } // Delete a service, and remove it from the cleanup list -func (t *WebserverTest) DeleteService(serviceName string) error { +func (t *ServerTest) DeleteService(serviceName string) error { err := t.Client.Services(t.Namespace).Delete(serviceName) if err == nil { delete(t.services, serviceName) @@ -1519,7 +1599,7 @@ func (t *WebserverTest) DeleteService(serviceName string) error { return err } -func (t *WebserverTest) Cleanup() []error { +func (t *ServerTest) Cleanup() []error { var errs []error for rcName := range t.rcs { By("stopping RC " + rcName + " in namespace " + t.Namespace) From 9e6c45c395b4cda611c6b325aa70c8475d446047 Mon Sep 17 00:00:00 2001 From: Kenneth Shelton Date: Sun, 6 Dec 2015 21:23:56 +0000 Subject: [PATCH 2/2] Updated comments Updated documentation Fixed e2e test --- docs/user-guide/services.md | 5 - pkg/api/validation/validation.go | 2 +- .../providers/openstack/openstack.go | 1 + pkg/controller/service/servicecontroller.go | 6 +- test/e2e/cluster_upgrade.go | 32 ----- test/e2e/service.go | 124 +++++++++++------- 6 files changed, 79 insertions(+), 91 deletions(-) diff --git a/docs/user-guide/services.md b/docs/user-guide/services.md index 4a6f86d2531..4596ce382c7 100644 --- a/docs/user-guide/services.md +++ b/docs/user-guide/services.md @@ -418,9 +418,6 @@ Valid values for the `ServiceType` field are: which forwards to the `Service` exposed as a `:NodePort` for each Node. -Note that while `NodePort`s can be TCP or UDP, `LoadBalancer`s only support TCP -as of Kubernetes 1.0. - ### Type NodePort If you set the `type` field to `"NodePort"`, the Kubernetes master will @@ -537,8 +534,6 @@ This makes some kinds of firewalling impossible. The iptables proxier does not obscure in-cluster source IPs, but it does still impact clients coming through a load-balancer or node-port. -LoadBalancers only support TCP, not UDP. - The `Type` field is designed as nested functionality - each level adds to the previous. This is not strictly required on all cloud providers (e.g. Google Compute Engine does not need to allocate a `NodePort` to make `LoadBalancer` work, but AWS does) diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index e46ae862bcf..f99733eeacb 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -1458,7 +1458,7 @@ func ValidateService(service *api.Service) field.ErrorList { for i := range service.Spec.Ports { portPath := portsPath.Index(i) if !supportedPortProtocols.Has(string(service.Spec.Ports[i].Protocol)) { - allErrs = append(allErrs, validation.NewInvalidError(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP ports")) + allErrs = append(allErrs, field.Invalid(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP ports")) } } } diff --git a/pkg/cloudprovider/providers/openstack/openstack.go b/pkg/cloudprovider/providers/openstack/openstack.go index 9cc3b96bf64..c94d22376df 100644 --- a/pkg/cloudprovider/providers/openstack/openstack.go +++ b/pkg/cloudprovider/providers/openstack/openstack.go @@ -660,6 +660,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n } // The service controller verified all the protocols match on the ports, just check and use the first one + // TODO: Convert all error messages to use an event recorder if ports[0].Protocol != api.ProtocolTCP { return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") } diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index d6f8ceb5490..221645b7d31 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -378,8 +378,9 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error { return err } name := s.loadBalancerName(service) - // getPortsForLB already verified that the protocol matches for all ports. - // The cloud provider will verify the protocol is supported + // - Only one protocol supported per service + // - Not all cloud providers support all protocols and the next step is expected to return + // an error for unsupported protocols status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP), ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) if err != nil { @@ -494,6 +495,7 @@ func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) { if protocol == "" { protocol = sp.Protocol } else if protocol != sp.Protocol && wantsLoadBalancer(service) { + // TODO: Convert error messages to use event recorder return nil, fmt.Errorf("mixed protocol external load balancers are not supported.") } } diff --git a/test/e2e/cluster_upgrade.go b/test/e2e/cluster_upgrade.go index 870eb29b0ef..af4d0c1c60b 100644 --- a/test/e2e/cluster_upgrade.go +++ b/test/e2e/cluster_upgrade.go @@ -200,38 +200,6 @@ var _ = Describe("Cluster Upgrade [Skipped]", func() { BeforeEach(func() { SkipUnlessProviderIs("gce") }) - // The version is determined once at the beginning of the test so that - // the master and nodes won't be skewed if the value changes during the - // test. - By(fmt.Sprintf("Getting real version for %q", testContext.UpgradeTarget)) - var err error - v, err = realVersion(testContext.UpgradeTarget) - expectNoError(err) - Logf("Version for %q is %q", testContext.UpgradeTarget, v) - - By("Setting up the service, RC, and pods") - f.beforeEach() - w = NewServerTest(f.Client, f.Namespace.Name, svcName) - rc := w.CreateWebserverRC(replicas) - rcName = rc.ObjectMeta.Name - svc := w.BuildServiceSpec() - svc.Spec.Type = api.ServiceTypeLoadBalancer - w.CreateService(svc) - - By("Waiting for the service to become reachable") - result, err := waitForLoadBalancerIngress(f.Client, svcName, f.Namespace.Name) - Expect(err).NotTo(HaveOccurred()) - ingresses := result.Status.LoadBalancer.Ingress - if len(ingresses) != 1 { - Failf("Was expecting only 1 ingress IP but got %d (%v): %v", len(ingresses), ingresses, result) - } - ingress = ingresses[0] - Logf("Got load balancer ingress point %v", ingress) - ip = ingress.IP - if ip == "" { - ip = ingress.Hostname - } - testLoadBalancerReachable(ingress, 80) It("of master should maintain responsive services", func() { By("Validating cluster before master upgrade") diff --git a/test/e2e/service.go b/test/e2e/service.go index ab330eba3ec..cead6421543 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -825,7 +825,7 @@ var _ = Describe("Services", func() { svc2 := t2.BuildServiceSpec() svc2.Spec.Type = api.ServiceTypeLoadBalancer svc2.Spec.Ports[0].Port = servicePort - // Let this one be UDP so that we can test that as well without an additional test + // UDP loadbalancing is tested via test NetcatTest svc2.Spec.Ports[0].Protocol = api.ProtocolUDP svc2.Spec.Ports[0].TargetPort = intstr.FromInt(80) svc2.Spec.LoadBalancerIP = loadBalancerIP @@ -1171,13 +1171,14 @@ func testLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool { return testLoadBalancerReachableInTime(ingress, port, podStartTimeout) } -func testNetcatLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) { - ip := ingress.IP - if ip == "" { - ip = ingress.Hostname - } +func testNetcatLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool { + return testNetcatLoadBalancerReachableInTime(ingress, port, podStartTimeout) +} - testNetcatReachable(ip, port) +func conditionFuncDecorator(ip string, port int, fn func(string, int) (bool, error)) wait.ConditionFunc { + return func() (bool, error) { + return fn(ip, port) + } } func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool { @@ -1186,7 +1187,17 @@ func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, ip = ingress.Hostname } - return testReachableInTime(ip, port, timeout) + return testReachableInTime(conditionFuncDecorator(ip, port, testReachable), timeout) + +} + +func testNetcatLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool { + ip := ingress.IP + if ip == "" { + ip = ingress.Hostname + } + + return testReachableInTime(conditionFuncDecorator(ip, port, testNetcatReachable), timeout) } func testLoadBalancerNotReachable(ingress api.LoadBalancerIngress, port int) { @@ -1198,72 +1209,83 @@ func testLoadBalancerNotReachable(ingress api.LoadBalancerIngress, port int) { testNotReachable(ip, port) } -func testReachable(ip string, port int) bool { - return testReachableInTime(ip, port, podStartTimeout) +func testReachable(ip string, port int) (bool, error) { + url := fmt.Sprintf("http://%s:%d", ip, port) + if ip == "" { + Failf("Got empty IP for reachability check (%s)", url) + return false, nil + } + if port == 0 { + Failf("Got port==0 for reachability check (%s)", url) + return false, nil + } + + Logf("Testing reachability of %v", url) + + resp, err := httpGetNoConnectionPool(url) + if err != nil { + Logf("Got error waiting for reachability of %s: %v", url, err) + return false, nil + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + Logf("Got error reading response from %s: %v", url, err) + return false, nil + } + if resp.StatusCode != 200 { + return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body)) + } + if !strings.Contains(string(body), "test-webserver") { + return false, fmt.Errorf("received response body without expected substring 'test-webserver': %s", string(body)) + } + Logf("Successfully reached %v", url) + return true, nil } -func testNetcatReachable(ip string, port int) { +func testNetcatReachable(ip string, port int) (bool, error) { + uri := fmt.Sprintf("udp://%s:%d", ip, port) + if ip == "" { + Failf("Got empty IP for reachability check (%s)", uri) + return false, nil + } + if port == 0 { + Failf("Got port==0 for reachability check (%s)", uri) + return false, nil + } + + Logf("Testing reachability of %v", uri) + con, err := net.Dial("udp", ip+":"+string(port)) if err != nil { - Failf("Failed to connect to: %s:%d (%s)", ip, port, err.Error()) + return false, fmt.Errorf("Failed to connect to: %s:%d (%s)", ip, port, err.Error()) } _, err = con.Write([]byte("\n")) if err != nil { - Failf("Failed to send newline: %s", err.Error()) + return false, fmt.Errorf("Failed to send newline: %s", err.Error()) } var buf []byte = make([]byte, len("SUCCESS")+1) _, err = con.Read(buf) if err != nil { - Failf("Failed to read result: %s", err.Error()) + return false, fmt.Errorf("Failed to read result: %s", err.Error()) } if !strings.HasPrefix(string(buf), "SUCCESS") { - Failf("Failed to retrieve: \"SUCCESS\"") + return false, fmt.Errorf("Failed to retrieve: \"SUCCESS\"") } Logf("Successfully retrieved \"SUCCESS\"") + return true, nil } -func testReachableInTime(ip string, port int, timeout time.Duration) bool { - url := fmt.Sprintf("http://%s:%d", ip, port) - if ip == "" { - Failf("Got empty IP for reachability check (%s)", url) - return false - } - if port == 0 { - Failf("Got port==0 for reachability check (%s)", url) - return false - } - - desc := fmt.Sprintf("the url %s to be reachable", url) - By(fmt.Sprintf("Waiting up to %v for %s", timeout, desc)) - start := time.Now() - err := wait.PollImmediate(poll, timeout, func() (bool, error) { - resp, err := httpGetNoConnectionPool(url) - if err != nil { - Logf("Got error waiting for reachability of %s: %v (%v)", url, err, time.Since(start)) - return false, nil - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - Logf("Got error reading response from %s: %v", url, err) - return false, nil - } - if resp.StatusCode != 200 { - return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body)) - } - if !strings.Contains(string(body), "test-webserver") { - return false, fmt.Errorf("received response body without expected substring 'test-webserver': %s", string(body)) - } - Logf("Successfully reached %v", url) - return true, nil - }) +func testReachableInTime(testFunc wait.ConditionFunc, timeout time.Duration) bool { + By(fmt.Sprintf("Waiting up to %v", timeout)) + err := wait.PollImmediate(poll, timeout, testFunc) if err != nil { - Expect(err).NotTo(HaveOccurred(), "Error waiting for %s", desc) + Expect(err).NotTo(HaveOccurred(), "Error waiting") return false } return true