Merge pull request #14431 from Defensative/UDP-LB

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-01-08 12:39:02 -08:00
commit 37b5726716
22 changed files with 319 additions and 184 deletions

View File

@ -418,9 +418,6 @@ Valid values for the `ServiceType` field are:
which forwards to the `Service` exposed as a `<NodeIP>:NodePort`
for each Node.
Note that while `NodePort`s can be TCP or UDP, `LoadBalancer`s only support TCP
as of Kubernetes 1.0.
### Type NodePort
If you set the `type` field to `"NodePort"`, the Kubernetes master will
@ -537,8 +534,6 @@ This makes some kinds of firewalling impossible. The iptables proxier does not
obscure in-cluster source IPs, but it does still impact clients coming through
a load-balancer or node-port.
LoadBalancers only support TCP, not UDP.
The `Type` field is designed as nested functionality - each level adds to the
previous. This is not strictly required on all cloud providers (e.g. Google Compute Engine does
not need to allocate a `NodePort` to make `LoadBalancer` work, but AWS does)

View File

@ -1457,8 +1457,8 @@ func ValidateService(service *api.Service) field.ErrorList {
portsPath := specPath.Child("ports")
for i := range service.Spec.Ports {
portPath := portsPath.Index(i)
if service.Spec.Ports[i].Protocol != api.ProtocolTCP {
allErrs = append(allErrs, field.Invalid(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "may not use protocols other than 'TCP' when `type` is 'LoadBalancer'"))
if !supportedPortProtocols.Has(string(service.Spec.Ports[i].Protocol)) {
allErrs = append(allErrs, field.Invalid(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP ports"))
}
}
}

View File

@ -2095,20 +2095,20 @@ func TestValidateService(t *testing.T) {
numErrs: 1,
},
{
name: "invalid load balancer protocol 1",
name: "valid load balancer protocol UDP 1",
tweakSvc: func(s *api.Service) {
s.Spec.Type = api.ServiceTypeLoadBalancer
s.Spec.Ports[0].Protocol = "UDP"
},
numErrs: 1,
numErrs: 0,
},
{
name: "invalid load balancer protocol 2",
name: "valid load balancer protocol UDP 2",
tweakSvc: func(s *api.Service) {
s.Spec.Type = api.ServiceTypeLoadBalancer
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "UDP", TargetPort: intstr.FromInt(12345)})
},
numErrs: 1,
numErrs: 0,
},
{
name: "valid 1",

View File

@ -27,8 +27,8 @@ import (
// Interface is an abstract, pluggable interface for cloud providers.
type Interface interface {
// TCPLoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
TCPLoadBalancer() (TCPLoadBalancer, bool)
// LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
LoadBalancer() (LoadBalancer, bool)
// Instances returns an instances interface. Also returns true if the interface is supported, false otherwise.
Instances() (Instances, bool)
// Zones returns a zones interface. Also returns true if the interface is supported, false otherwise.
@ -76,23 +76,23 @@ func GetInstanceProviderID(cloud Interface, nodeName string) (string, error) {
return cloud.ProviderName() + "://" + instanceID, nil
}
// TCPLoadBalancer is an abstract, pluggable interface for TCP load balancers.
type TCPLoadBalancer interface {
// LoadBalancer is an abstract, pluggable interface for load balancers.
type LoadBalancer interface {
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
// GetTCPLoadBalancer returns whether the specified load balancer exists, and
// GetLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is.
GetTCPLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
// EnsureTCPLoadBalancer creates a new tcp load balancer, or updates an existing one. Returns the status of the balancer
EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
// UpdateTCPLoadBalancer updates hosts under the specified load balancer.
UpdateTCPLoadBalancer(name, region string, hosts []string) error
// EnsureTCPLoadBalancerDeleted deletes the specified load balancer if it
GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
// EnsureLoadBalancer creates a new load balancer, or updates an existing one. Returns the status of the balancer
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
// UpdateLoadBalancer updates hosts under the specified load balancer.
UpdateLoadBalancer(name, region string, hosts []string) error
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted.
// This construction is useful because many cloud providers' load balancers
// have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around.
EnsureTCPLoadBalancerDeleted(name, region string) error
EnsureLoadBalancerDeleted(name, region string) error
}
// Instances is an abstract, pluggable interface for sets of instances.

View File

@ -179,7 +179,7 @@ type InstanceGroupInfo interface {
CurrentSize() (int, error)
}
// AWSCloud is an implementation of Interface, TCPLoadBalancer and Instances for Amazon Web Services.
// AWSCloud is an implementation of Interface, LoadBalancer and Instances for Amazon Web Services.
type AWSCloud struct {
ec2 EC2
elb ELB
@ -608,8 +608,8 @@ func (aws *AWSCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, searches
}
// TCPLoadBalancer returns an implementation of TCPLoadBalancer for Amazon Web Services.
func (s *AWSCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
// LoadBalancer returns an implementation of LoadBalancer for Amazon Web Services.
func (s *AWSCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return s, true
}
@ -1684,8 +1684,8 @@ func (s *AWSCloud) listSubnetIDsinVPC(vpcId string) ([]string, error) {
// EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway.
func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts)
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts)
if region != s.region {
return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
@ -1696,6 +1696,15 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
}
if len(ports) == 0 {
return nil, fmt.Errorf("requested load balancer with no ports")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
if ports[0].Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB")
}
if publicIP != nil {
return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB")
}
@ -1801,8 +1810,8 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p
return status, nil
}
// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer
func (s *AWSCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (s *AWSCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
if region != s.region {
return nil, false, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}
@ -1965,8 +1974,8 @@ func (s *AWSCloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalan
return nil
}
// EnsureTCPLoadBalancerDeleted implements TCPLoadBalancer.EnsureTCPLoadBalancerDeleted.
func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
// EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted.
func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
if region != s.region {
return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}
@ -2059,8 +2068,8 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
return nil
}
// UpdateTCPLoadBalancer implements TCPLoadBalancer.UpdateTCPLoadBalancer
func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
func (s *AWSCloud) UpdateLoadBalancer(name, region string, hosts []string) error {
if region != s.region {
return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}

View File

@ -693,24 +693,24 @@ func TestLoadBalancerMatchesClusterRegion(t *testing.T) {
badELBRegion := "bad-elb-region"
errorMessage := fmt.Sprintf("requested load balancer region '%s' does not match cluster region '%s'", badELBRegion, c.region)
_, _, err = c.GetTCPLoadBalancer("elb-name", badELBRegion)
_, _, err = c.GetLoadBalancer("elb-name", badELBRegion)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected GetTCPLoadBalancer region mismatch error.")
t.Errorf("Expected GetLoadBalancer region mismatch error.")
}
_, err = c.EnsureTCPLoadBalancer("elb-name", badELBRegion, nil, nil, nil, api.ServiceAffinityNone)
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, api.ServiceAffinityNone)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureTCPLoadBalancer region mismatch error.")
t.Errorf("Expected EnsureLoadBalancer region mismatch error.")
}
err = c.EnsureTCPLoadBalancerDeleted("elb-name", badELBRegion)
err = c.EnsureLoadBalancerDeleted("elb-name", badELBRegion)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureTCPLoadBalancerDeleted region mismatch error.")
t.Errorf("Expected EnsureLoadBalancerDeleted region mismatch error.")
}
err = c.UpdateTCPLoadBalancer("elb-name", badELBRegion, nil)
err = c.UpdateLoadBalancer("elb-name", badELBRegion, nil)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected UpdateTCPLoadBalancer region mismatch error.")
t.Errorf("Expected UpdateLoadBalancer region mismatch error.")
}
}

View File

@ -15,5 +15,5 @@ limitations under the License.
*/
// Package fake is a test-double implementation of cloudprovider
// Interface, TCPLoadBalancer and Instances. It is useful for testing.
// Interface, LoadBalancer and Instances. It is useful for testing.
package fake

View File

@ -44,7 +44,7 @@ type FakeUpdateBalancerCall struct {
Hosts []string
}
// FakeCloud is a test-double implementation of Interface, TCPLoadBalancer, Instances, and Routes. It is useful for testing.
// FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing.
type FakeCloud struct {
Exists bool
Err error
@ -99,9 +99,9 @@ func (f *FakeCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []s
return nameservers, searches
}
// TCPLoadBalancer returns a fake implementation of TCPLoadBalancer.
// LoadBalancer returns a fake implementation of LoadBalancer.
// Actually it just returns f itself.
func (f *FakeCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
func (f *FakeCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return f, true
}
@ -120,17 +120,17 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) {
return f, true
}
// GetTCPLoadBalancer is a stub implementation of TCPLoadBalancer.GetTCPLoadBalancer.
func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
// GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer.
func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
return status, f.Exists, f.Err
}
// EnsureTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
// It adds an entry "create" into the internal method call record.
func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
f.addCall("create")
if f.Balancers == nil {
f.Balancers = make(map[string]FakeBalancer)
@ -143,17 +143,17 @@ func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP
return status, f.Err
}
// UpdateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.UpdateTCPLoadBalancer.
// UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer.
// It adds an entry "update" into the internal method call record.
func (f *FakeCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
func (f *FakeCloud) UpdateLoadBalancer(name, region string, hosts []string) error {
f.addCall("update")
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts})
return f.Err
}
// EnsureTCPLoadBalancerDeleted is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted.
// EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted.
// It adds an entry "delete" into the internal method call record.
func (f *FakeCloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
func (f *FakeCloud) EnsureLoadBalancerDeleted(name, region string) error {
f.addCall("delete")
return f.Err
}

View File

@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package gce is an implementation of Interface, TCPLoadBalancer
// Package gce is an implementation of Interface, LoadBalancer
// and Instances for Google Compute Engine.
package gce

View File

@ -60,7 +60,7 @@ const (
operationPollTimeoutDuration = 30 * time.Minute
)
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
service *compute.Service
containerService *container.Service
@ -253,8 +253,8 @@ func (gce *GCECloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, srchOut
}
// TCPLoadBalancer returns an implementation of TCPLoadBalancer for Google Compute Engine.
func (gce *GCECloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
// LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine.
func (gce *GCECloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return gce, true
}
@ -350,8 +350,8 @@ func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error {
})
}
// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer
func (gce *GCECloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (gce *GCECloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err == nil {
status := &api.LoadBalancerStatus{}
@ -370,16 +370,18 @@ func isHTTPErrorCode(err error, code int) bool {
return ok && apiErr.Code == code
}
// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
// EnsureLoadBalancer is an implementation of LoadBalancer.EnsureLoadBalancer.
// Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them.
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, ports, hosts)
if len(hosts) == 0 {
return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts")
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
}
// Check if the forwarding rule exists, and if so, what its IP is.
@ -426,7 +428,7 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, requestedIP net.
// Deal with the firewall next. The reason we do this here rather than last
// is because the forwarding rule is used as the indicator that the load
// balancer is fully created - it's what getTCPLoadBalancer checks for.
// balancer is fully created - it's what getLoadBalancer checks for.
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports)
if err != nil {
return nil, err
@ -515,6 +517,11 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP
if portRange != fwd.PortRange {
return true, true, fwd.IPAddress, nil
}
// The service controller verified all the protocols match on the ports, just check the first one
if string(ports[0].Protocol) != fwd.IPProtocol {
return true, true, fwd.IPAddress, nil
}
return true, false, fwd.IPAddress, nil
}
@ -522,6 +529,12 @@ func loadBalancerPortRange(ports []*api.ServicePort) (string, error) {
if len(ports) == 0 {
return "", fmt.Errorf("no ports specified for GCE load balancer")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
if ports[0].Protocol != api.ProtocolTCP && ports[0].Protocol != api.ProtocolUDP {
return "", fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(ports[0].Protocol))
}
minPort := 65536
maxPort := 0
for i := range ports {
@ -575,7 +588,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [
if fw.Description != makeFirewallDescription(ipAddress) {
return true, true, nil
}
if len(fw.Allowed) != 1 || fw.Allowed[0].IPProtocol != "tcp" {
if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") {
return true, true, nil
}
// Make sure the allowed ports match.
@ -586,6 +599,8 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [
if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) {
return true, true, nil
}
// The service controller already verified that the protocol matches on all ports, no need to check.
return true, false, nil
}
@ -617,11 +632,10 @@ func (gce *GCECloud) createForwardingRule(name, region, ipAddress string, ports
return err
}
req := &compute.ForwardingRule{
Name: name,
IPAddress: ipAddress,
IPProtocol: "TCP",
PortRange: portRange,
Target: gce.targetPoolURL(name, region),
Name: name,
IPAddress: ipAddress, IPProtocol: string(ports[0].Protocol),
PortRange: portRange,
Target: gce.targetPoolURL(name, region),
}
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
@ -713,7 +727,7 @@ func (gce *GCECloud) firewallObject(name, region, ipAddress string, ports []*api
TargetTags: hostTags,
Allowed: []*compute.FirewallAllowed{
{
IPProtocol: "tcp",
IPProtocol: strings.ToLower(string(ports[0].Protocol)),
Ports: allowedPorts,
},
},
@ -803,8 +817,8 @@ func (gce *GCECloud) createOrPromoteStaticIP(name, region, existingIP string) (i
return address.Address, nil
}
// UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer.
func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(name, region string, hosts []string) error {
pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
if err != nil {
return err
@ -864,12 +878,12 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string)
return nil
}
// EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted.
func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
// EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted.
func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error {
err := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(name, region) },
// Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureTCPLoadBalancer left one around in a failed
// possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case.
func() error { return gce.deleteStaticIP(name, region) },
func() error {

View File

@ -102,10 +102,10 @@ func (c *MesosCloud) Instances() (cloudprovider.Instances, bool) {
return c, true
}
// TCPLoadBalancer always returns nil, false in this implementation.
// LoadBalancer always returns nil, false in this implementation.
// Mesos does not provide any type of native load balancing by default,
// so this implementation always returns (nil, false).
func (c *MesosCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
func (c *MesosCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false
}

View File

@ -121,15 +121,15 @@ func Test_Instances(t *testing.T) {
}
}
// test mesos.TCPLoadBalancer
// test mesos.LoadBalancer
func Test_TcpLoadBalancer(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
lb, supports_lb := mesosCloud.TCPLoadBalancer()
lb, supports_lb := mesosCloud.LoadBalancer()
if supports_lb || lb != nil {
t.Fatalf("MesosCloud does not provide an implementation of TCPLoadBalancer")
t.Fatalf("MesosCloud does not provide an implementation of LoadBalancer")
}
}

View File

@ -495,8 +495,8 @@ type LoadBalancer struct {
opts LoadBalancerOpts
}
func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
glog.V(4).Info("openstack.TCPLoadBalancer() called")
func (os *OpenStack) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
glog.V(4).Info("openstack.LoadBalancer() called")
// TODO: Search for and support Rackspace loadbalancer API, and others.
network, err := openstack.NewNetworkV2(os.provider, gophercloud.EndpointOpts{
@ -515,7 +515,7 @@ func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
return nil, false
}
glog.V(1).Info("Claiming to support TCPLoadBalancer")
glog.V(1).Info("Claiming to support LoadBalancer")
return &LoadBalancer{network, compute, os.lbOpts}, true
}
@ -630,7 +630,7 @@ func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*f
return &floatingIPList[0], nil
}
func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
vip, err := getVipByName(lb.network, name)
if err == ErrNotFound {
return nil, false, nil
@ -650,11 +650,19 @@ func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalanc
// a list of regions (from config) and query/create loadbalancers in
// each region.
func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, affinity)
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, affinity)
if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
} else if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
// TODO: Convert all error messages to use an event recorder
if ports[0].Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
}
var persistence *vips.SessionPersistence
@ -668,7 +676,7 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI
}
glog.V(2).Info("Checking if openstack load balancer already exists: %s", name)
_, exists, err := lb.GetTCPLoadBalancer(name, region)
_, exists, err := lb.GetLoadBalancer(name, region)
if err != nil {
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
}
@ -676,7 +684,7 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := lb.EnsureTCPLoadBalancerDeleted(name, region)
err := lb.EnsureLoadBalancerDeleted(name, region)
if err != nil {
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
}
@ -777,8 +785,8 @@ func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, loadBalancerI
}
func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
glog.V(4).Infof("UpdateTCPLoadBalancer(%v, %v, %v)", name, region, hosts)
func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string) error {
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", name, region, hosts)
vip, err := getVipByName(lb.network, name)
if err != nil {
@ -838,8 +846,8 @@ func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []strin
return nil
}
func (lb *LoadBalancer) EnsureTCPLoadBalancerDeleted(name, region string) error {
glog.V(4).Infof("EnsureTCPLoadBalancerDeleted(%v, %v)", name, region)
func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error {
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", name, region)
vip, err := getVipByName(lb.network, name)
if err != nil && err != ErrNotFound {

View File

@ -151,7 +151,7 @@ func TestInstances(t *testing.T) {
t.Logf("Found NodeAddresses(%s) = %s\n", srvs[0], addrs)
}
func TestTCPLoadBalancer(t *testing.T) {
func TestLoadBalancer(t *testing.T) {
cfg, ok := configFromEnv()
if !ok {
t.Skipf("No config found in environment")
@ -162,17 +162,17 @@ func TestTCPLoadBalancer(t *testing.T) {
t.Fatalf("Failed to construct/authenticate OpenStack: %s", err)
}
lb, ok := os.TCPLoadBalancer()
lb, ok := os.LoadBalancer()
if !ok {
t.Fatalf("TCPLoadBalancer() returned false - perhaps your stack doesn't support Neutron?")
t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?")
}
_, exists, err := lb.GetTCPLoadBalancer("noexist", "region")
_, exists, err := lb.GetLoadBalancer("noexist", "region")
if err != nil {
t.Fatalf("GetTCPLoadBalancer(\"noexist\") returned error: %s", err)
t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
}
if exists {
t.Fatalf("GetTCPLoadBalancer(\"noexist\") returned exists")
t.Fatalf("GetLoadBalancer(\"noexist\") returned exists")
}
}

View File

@ -128,8 +128,8 @@ func (v *OVirtCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, searches
}
// TCPLoadBalancer returns an implementation of TCPLoadBalancer for oVirt cloud
func (v *OVirtCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
// LoadBalancer returns an implementation of LoadBalancer for oVirt cloud
func (v *OVirtCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false
}

View File

@ -367,7 +367,7 @@ func (os *Rackspace) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, searches
}
func (os *Rackspace) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
func (os *Rackspace) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false
}

View File

@ -69,7 +69,7 @@ type ServiceController struct {
cloud cloudprovider.Interface
kubeClient client.Interface
clusterName string
balancer cloudprovider.TCPLoadBalancer
balancer cloudprovider.LoadBalancer
zone cloudprovider.Zone
cache *serviceCache
eventBroadcaster record.EventBroadcaster
@ -150,9 +150,9 @@ func (s *ServiceController) init() error {
return fmt.Errorf("ServiceController should not be run without a cloudprovider.")
}
balancer, ok := s.cloud.TCPLoadBalancer()
balancer, ok := s.cloud.LoadBalancer()
if !ok {
return fmt.Errorf("the cloud provider does not support TCP load balancers.")
return fmt.Errorf("the cloud provider does not support external load balancers.")
}
s.balancer = balancer
@ -256,7 +256,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
s.cache.set(namespacedName.String(), cachedService)
case cache.Deleted:
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region)
err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region)
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
@ -278,7 +278,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
return nil, notRetryable
}
// Note: It is safe to just call EnsureTCPLoadBalancer. But, on some clouds that requires a delete & create,
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
// Save the state so we can avoid a write if it doesn't change
@ -293,8 +293,8 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
} else {
// If we don't have any cached memory of the load balancer, we have to ask
// the cloud provider for what it knows about it.
// Technically EnsureTCPLoadBalancerDeleted can cope, but we want to post meaningful events
_, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region)
// Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events
_, exists, err := s.balancer.GetLoadBalancer(s.loadBalancerName(service), s.zone.Region)
if err != nil {
return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable
}
@ -306,7 +306,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
if needDelete {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil {
if err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil {
return err, retryable
}
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
@ -378,7 +378,10 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error {
return err
}
name := s.loadBalancerName(service)
status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
// - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
if err != nil {
return err
@ -482,15 +485,19 @@ func (s *ServiceController) loadBalancerName(service *api.Service) string {
}
func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
var protocol api.Protocol
ports := []*api.ServicePort{}
for i := range service.Spec.Ports {
// TODO: Support UDP. Remove the check from the API validation package once
// it's supported.
sp := &service.Spec.Ports[i]
if sp.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("load balancers for non TCP services are not currently supported.")
}
// The check on protocol was removed here. The cloud provider itself is now responsible for all protocol validation
ports = append(ports, sp)
if protocol == "" {
protocol = sp.Protocol
} else if protocol != sp.Protocol && wantsLoadBalancer(service) {
// TODO: Convert error messages to use event recorder
return nil, fmt.Errorf("mixed protocol external load balancers are not supported.")
}
}
return ports, nil
}
@ -667,7 +674,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, h
return
}
if err := s.lockedUpdateLoadBalancerHosts(service.appliedState, hosts); err != nil {
glog.Errorf("External error while updating TCP load balancer: %v.", err)
glog.Errorf("External error while updating load balancer: %v.", err)
servicesToRetry = append(servicesToRetry, service)
}
}()
@ -684,15 +691,15 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service,
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
name := cloudprovider.GetLoadBalancerName(service)
err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts)
err := s.balancer.UpdateLoadBalancer(name, s.zone.Region, hosts)
if err == nil {
s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
return nil
}
// It's only an actual error if the load balancer still exists.
if _, exists, err := s.balancer.GetTCPLoadBalancer(name, s.zone.Region); err != nil {
glog.Errorf("External error while checking if TCP load balancer %q exists: name, %v", name, err)
if _, exists, err := s.balancer.GetLoadBalancer(name, s.zone.Region); err != nil {
glog.Errorf("External error while checking if load balancer %q exists: name, %v", name, err)
} else if !exists {
return nil
}

View File

@ -65,8 +65,8 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
Type: api.ServiceTypeLoadBalancer,
},
},
expectErr: true,
expectCreateAttempt: false,
expectErr: false,
expectCreateAttempt: true,
},
{
service: &api.Service{

View File

@ -162,10 +162,10 @@ var _ = Describe("Cluster Upgrade [Skipped]", func() {
})
f := NewFramework("cluster-upgrade")
var w *WebserverTest
var w *ServerTest
BeforeEach(func() {
By("Setting up the service, RC, and pods")
w = NewWebserverTest(f.Client, f.Namespace.Name, svcName)
w = NewServerTest(f.Client, f.Namespace.Name, svcName)
rc := w.CreateWebserverRC(replicas)
rcName = rc.ObjectMeta.Name
svc := w.BuildServiceSpec()

View File

@ -178,7 +178,7 @@ func createApp(c *client.Client, ns string, i int) {
Expect(err).NotTo(HaveOccurred())
Logf("Creating rc %v", name)
rc := rcByNamePort(name, 1, testImage, httpContainerPort, l)
rc := rcByNamePort(name, 1, testImage, httpContainerPort, api.ProtocolTCP, l)
rc.Spec.Template.Spec.Containers[0].Args = []string{
"--num=1",
fmt.Sprintf("--start=%d", i),

View File

@ -173,11 +173,11 @@ func rcByName(name string, replicas int, image string, labels map[string]string)
})
}
func rcByNamePort(name string, replicas int, image string, port int, labels map[string]string) *api.ReplicationController {
func rcByNamePort(name string, replicas int, image string, port int, protocol api.Protocol, labels map[string]string) *api.ReplicationController {
return rcByNameContainer(name, replicas, image, labels, api.Container{
Name: name,
Image: image,
Ports: []api.ContainerPort{{ContainerPort: port}},
Ports: []api.ContainerPort{{ContainerPort: port, Protocol: protocol}},
})
}
@ -215,7 +215,7 @@ func rcByNameContainer(name string, replicas int, image string, labels map[strin
func newRCByName(c *client.Client, ns, name string, replicas int) (*api.ReplicationController, error) {
By(fmt.Sprintf("creating replication controller %s", name))
return c.ReplicationControllers(ns).Create(rcByNamePort(
name, replicas, serveHostnameImage, 9376, map[string]string{}))
name, replicas, serveHostnameImage, 9376, api.ProtocolTCP, map[string]string{}))
}
func resizeRC(c *client.Client, ns, name string, replicas int) error {

View File

@ -20,6 +20,7 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"sort"
"strconv"
@ -354,7 +355,7 @@ var _ = Describe("Services", func() {
serviceName := "nodeportservice-test"
ns := f.Namespace.Name
t := NewWebserverTest(c, ns, serviceName)
t := NewServerTest(c, ns, serviceName)
defer func() {
defer GinkgoRecover()
errs := t.Cleanup()
@ -410,7 +411,7 @@ var _ = Describe("Services", func() {
serviceName := "mutability-service-test"
t := NewWebserverTest(f.Client, f.Namespace.Name, serviceName)
t := NewServerTest(f.Client, f.Namespace.Name, serviceName)
defer func() {
defer GinkgoRecover()
errs := t.Cleanup()
@ -609,7 +610,7 @@ var _ = Describe("Services", func() {
serviceName2 := baseName + "2"
ns := f.Namespace.Name
t := NewWebserverTest(c, ns, serviceName1)
t := NewServerTest(c, ns, serviceName1)
defer func() {
defer GinkgoRecover()
errs := t.Cleanup()
@ -661,7 +662,7 @@ var _ = Describe("Services", func() {
serviceName := "nodeport-range-test"
ns := f.Namespace.Name
t := NewWebserverTest(c, ns, serviceName)
t := NewServerTest(c, ns, serviceName)
defer func() {
defer GinkgoRecover()
errs := t.Cleanup()
@ -727,7 +728,7 @@ var _ = Describe("Services", func() {
serviceName := "nodeport-reuse"
ns := f.Namespace.Name
t := NewWebserverTest(c, ns, serviceName)
t := NewServerTest(c, ns, serviceName)
defer func() {
defer GinkgoRecover()
errs := t.Cleanup()
@ -795,7 +796,7 @@ var _ = Describe("Services", func() {
servicePort := 9376
By("creating service " + serviceName + " with load balancer in namespace " + ns1)
t1 := NewWebserverTest(c, ns1, serviceName)
t1 := NewServerTest(c, ns1, serviceName)
svc1 := t1.BuildServiceSpec()
svc1.Spec.Type = api.ServiceTypeLoadBalancer
svc1.Spec.Ports[0].Port = servicePort
@ -819,18 +820,20 @@ var _ = Describe("Services", func() {
}()
}
By("creating service " + serviceName + " with load balancer in namespace " + ns2)
t2 := NewWebserverTest(c, ns2, serviceName)
By("creating service " + serviceName + " with UDP load balancer in namespace " + ns2)
t2 := NewNetcatTest(c, ns2, serviceName)
svc2 := t2.BuildServiceSpec()
svc2.Spec.Type = api.ServiceTypeLoadBalancer
svc2.Spec.Ports[0].Port = servicePort
// UDP loadbalancing is tested via test NetcatTest
svc2.Spec.Ports[0].Protocol = api.ProtocolUDP
svc2.Spec.Ports[0].TargetPort = intstr.FromInt(80)
svc2.Spec.LoadBalancerIP = loadBalancerIP
_, err = t2.CreateService(svc2)
Expect(err).NotTo(HaveOccurred())
By("creating pod to be part of service " + serviceName + " in namespace " + ns2)
t2.CreateWebserverRC(2)
t2.CreateNetcatRC(2)
ingressPoints := []string{}
svcs := []*api.Service{svc1, svc2}
@ -865,11 +868,19 @@ var _ = Describe("Services", func() {
}
ingressPoints = append(ingressPoints, ing) // Save 'em to check uniqueness
By("hitting the pod through the service's NodePort")
testReachable(pickNodeIP(c), port.NodePort)
if svc1.Spec.Ports[0].Protocol == api.ProtocolTCP {
By("hitting the pod through the service's NodePort")
testReachable(pickNodeIP(c), port.NodePort)
By("hitting the pod through the service's external load balancer")
testLoadBalancerReachable(ingress, servicePort)
By("hitting the pod through the service's external load balancer")
testLoadBalancerReachable(ingress, servicePort)
} else {
By("hitting the pod through the service's NodePort")
testNetcatReachable(pickNodeIP(c), port.NodePort)
By("hitting the pod through the service's external load balancer")
testNetcatLoadBalancerReachable(ingress, servicePort)
}
}
validateUniqueOrFail(ingressPoints)
})
@ -1160,13 +1171,33 @@ func testLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool {
return testLoadBalancerReachableInTime(ingress, port, podStartTimeout)
}
func testNetcatLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool {
return testNetcatLoadBalancerReachableInTime(ingress, port, podStartTimeout)
}
func conditionFuncDecorator(ip string, port int, fn func(string, int) (bool, error)) wait.ConditionFunc {
return func() (bool, error) {
return fn(ip, port)
}
}
func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool {
ip := ingress.IP
if ip == "" {
ip = ingress.Hostname
}
return testReachableInTime(ip, port, timeout)
return testReachableInTime(conditionFuncDecorator(ip, port, testReachable), timeout)
}
func testNetcatLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool {
ip := ingress.IP
if ip == "" {
ip = ingress.Hostname
}
return testReachableInTime(conditionFuncDecorator(ip, port, testNetcatReachable), timeout)
}
func testLoadBalancerNotReachable(ingress api.LoadBalancerIngress, port int) {
@ -1178,47 +1209,83 @@ func testLoadBalancerNotReachable(ingress api.LoadBalancerIngress, port int) {
testNotReachable(ip, port)
}
func testReachable(ip string, port int) bool {
return testReachableInTime(ip, port, podStartTimeout)
}
func testReachableInTime(ip string, port int, timeout time.Duration) bool {
func testReachable(ip string, port int) (bool, error) {
url := fmt.Sprintf("http://%s:%d", ip, port)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", url)
return false
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", url)
return false
return false, nil
}
desc := fmt.Sprintf("the url %s to be reachable", url)
By(fmt.Sprintf("Waiting up to %v for %s", timeout, desc))
start := time.Now()
err := wait.PollImmediate(poll, timeout, func() (bool, error) {
resp, err := httpGetNoConnectionPool(url)
if err != nil {
Logf("Got error waiting for reachability of %s: %v (%v)", url, err, time.Since(start))
return false, nil
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
Logf("Got error reading response from %s: %v", url, err)
return false, nil
}
if resp.StatusCode != 200 {
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body))
}
if !strings.Contains(string(body), "test-webserver") {
return false, fmt.Errorf("received response body without expected substring 'test-webserver': %s", string(body))
}
Logf("Successfully reached %v", url)
return true, nil
})
Logf("Testing reachability of %v", url)
resp, err := httpGetNoConnectionPool(url)
if err != nil {
Expect(err).NotTo(HaveOccurred(), "Error waiting for %s", desc)
Logf("Got error waiting for reachability of %s: %v", url, err)
return false, nil
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
Logf("Got error reading response from %s: %v", url, err)
return false, nil
}
if resp.StatusCode != 200 {
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body))
}
if !strings.Contains(string(body), "test-webserver") {
return false, fmt.Errorf("received response body without expected substring 'test-webserver': %s", string(body))
}
Logf("Successfully reached %v", url)
return true, nil
}
func testNetcatReachable(ip string, port int) (bool, error) {
uri := fmt.Sprintf("udp://%s:%d", ip, port)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", uri)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", uri)
return false, nil
}
Logf("Testing reachability of %v", uri)
con, err := net.Dial("udp", ip+":"+string(port))
if err != nil {
return false, fmt.Errorf("Failed to connect to: %s:%d (%s)", ip, port, err.Error())
}
_, err = con.Write([]byte("\n"))
if err != nil {
return false, fmt.Errorf("Failed to send newline: %s", err.Error())
}
var buf []byte = make([]byte, len("SUCCESS")+1)
_, err = con.Read(buf)
if err != nil {
return false, fmt.Errorf("Failed to read result: %s", err.Error())
}
if !strings.HasPrefix(string(buf), "SUCCESS") {
return false, fmt.Errorf("Failed to retrieve: \"SUCCESS\"")
}
Logf("Successfully retrieved \"SUCCESS\"")
return true, nil
}
func testReachableInTime(testFunc wait.ConditionFunc, timeout time.Duration) bool {
By(fmt.Sprintf("Waiting up to %v", timeout))
err := wait.PollImmediate(poll, timeout, testFunc)
if err != nil {
Expect(err).NotTo(HaveOccurred(), "Error waiting")
return false
}
return true
@ -1427,7 +1494,7 @@ func httpGetNoConnectionPool(url string) (*http.Response, error) {
}
// Simple helper class to avoid too much boilerplate in tests
type WebserverTest struct {
type ServerTest struct {
ServiceName string
Namespace string
Client *client.Client
@ -1441,8 +1508,8 @@ type WebserverTest struct {
image string
}
func NewWebserverTest(client *client.Client, namespace string, serviceName string) *WebserverTest {
t := &WebserverTest{}
func NewServerTest(client *client.Client, namespace string, serviceName string) *ServerTest {
t := &ServerTest{}
t.Client = client
t.Namespace = namespace
t.ServiceName = serviceName
@ -1460,8 +1527,27 @@ func NewWebserverTest(client *client.Client, namespace string, serviceName strin
return t
}
func NewNetcatTest(client *client.Client, namespace string, serviceName string) *ServerTest {
t := &ServerTest{}
t.Client = client
t.Namespace = namespace
t.ServiceName = serviceName
t.TestId = t.ServiceName + "-" + string(util.NewUUID())
t.Labels = map[string]string{
"testid": t.TestId,
}
t.rcs = make(map[string]bool)
t.services = make(map[string]bool)
t.name = "netcat"
t.image = "ubuntu"
return t
}
// Build default config for a service (which can then be changed)
func (t *WebserverTest) BuildServiceSpec() *api.Service {
func (t *ServerTest) BuildServiceSpec() *api.Service {
service := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: t.ServiceName,
@ -1480,8 +1566,24 @@ func (t *WebserverTest) BuildServiceSpec() *api.Service {
// CreateWebserverRC creates rc-backed pods with the well-known webserver
// configuration and records it for cleanup.
func (t *WebserverTest) CreateWebserverRC(replicas int) *api.ReplicationController {
rcSpec := rcByNamePort(t.name, replicas, t.image, 80, t.Labels)
func (t *ServerTest) CreateWebserverRC(replicas int) *api.ReplicationController {
rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolTCP, t.Labels)
rcAct, err := t.createRC(rcSpec)
if err != nil {
Failf("Failed to create rc %s: %v", rcSpec.Name, err)
}
if err := verifyPods(t.Client, t.Namespace, t.name, false, replicas); err != nil {
Failf("Failed to create %d pods with name %s: %v", replicas, t.name, err)
}
return rcAct
}
// CreateNetcatRC creates rc-backed pods with a netcat listener
// configuration and records it for cleanup.
func (t *ServerTest) CreateNetcatRC(replicas int) *api.ReplicationController {
rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolUDP, t.Labels)
rcSpec.Spec.Template.Spec.Containers[0].Command = []string{"/bin/bash"}
rcSpec.Spec.Template.Spec.Containers[0].Args = []string{"-c", "echo SUCCESS | nc -q 0 -u -l 0.0.0.0 80"}
rcAct, err := t.createRC(rcSpec)
if err != nil {
Failf("Failed to create rc %s: %v", rcSpec.Name, err)
@ -1493,7 +1595,7 @@ func (t *WebserverTest) CreateWebserverRC(replicas int) *api.ReplicationControll
}
// createRC creates a replication controller and records it for cleanup.
func (t *WebserverTest) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) {
func (t *ServerTest) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) {
rc, err := t.Client.ReplicationControllers(t.Namespace).Create(rc)
if err == nil {
t.rcs[rc.Name] = true
@ -1502,7 +1604,7 @@ func (t *WebserverTest) createRC(rc *api.ReplicationController) (*api.Replicatio
}
// Create a service, and record it for cleanup
func (t *WebserverTest) CreateService(service *api.Service) (*api.Service, error) {
func (t *ServerTest) CreateService(service *api.Service) (*api.Service, error) {
result, err := t.Client.Services(t.Namespace).Create(service)
if err == nil {
t.services[service.Name] = true
@ -1511,7 +1613,7 @@ func (t *WebserverTest) CreateService(service *api.Service) (*api.Service, error
}
// Delete a service, and remove it from the cleanup list
func (t *WebserverTest) DeleteService(serviceName string) error {
func (t *ServerTest) DeleteService(serviceName string) error {
err := t.Client.Services(t.Namespace).Delete(serviceName)
if err == nil {
delete(t.services, serviceName)
@ -1519,7 +1621,7 @@ func (t *WebserverTest) DeleteService(serviceName string) error {
return err
}
func (t *WebserverTest) Cleanup() []error {
func (t *ServerTest) Cleanup() []error {
var errs []error
for rcName := range t.rcs {
By("stopping RC " + rcName + " in namespace " + t.Namespace)