WIP: Implement multi-port Services

This commit is contained in:
Tim Hockin
2015-03-13 08:16:41 -07:00
parent 9ed87612d0
commit 186818d787
70 changed files with 2118 additions and 815 deletions

View File

@@ -136,7 +136,10 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
handler := NewServiceHandlerMock()
handler.Wait(1)
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate := CreateServiceUpdate(ADD, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
@@ -147,24 +150,35 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate := CreateServiceUpdate(ADD, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
handler.Wait(1)
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
handler.Wait(1)
channel <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}})
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
})
handler.Wait(1)
channel <- serviceUpdate3
services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}})
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 99}}},
})
handler.Wait(1)
channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]}
@@ -180,8 +194,14 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
}
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
handler.Wait(2)
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
@@ -197,8 +217,14 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T
handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
handler.Wait(2)
handler2.Wait(2)
channelOne <- serviceUpdate1

View File

@@ -17,6 +17,7 @@ limitations under the License.
package proxy
import (
"fmt"
"net"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -27,7 +28,18 @@ import (
type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given
// service-port and source address.
NextEndpoint(service types.NamespacedName, port string, srcAddr net.Addr) (string, error)
NewService(service types.NamespacedName, port string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service types.NamespacedName, port string)
NextEndpoint(service ServicePortName, srcAddr net.Addr) (string, error)
NewService(service ServicePortName, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service ServicePortName)
}
// ServicePortName carries a namespace + name + portname. This is the unique
// identfier for a load-balanced service.
type ServicePortName struct {
types.NamespacedName
Port string
}
func (spn ServicePortName) String() string {
return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port)
}

View File

@@ -35,14 +35,13 @@ import (
)
type serviceInfo struct {
portalIP net.IP
portalPort int
protocol api.Protocol
proxyPort int
socket proxySocket
timeout time.Duration
// TODO: make this an net.IP address
publicIP []string
portalIP net.IP
portalPort int
protocol api.Protocol
proxyPort int
socket proxySocket
timeout time.Duration
publicIPs []string // TODO: make this net.IP
sessionAffinityType api.AffinityType
stickyMaxAgeMinutes int
}
@@ -59,7 +58,7 @@ type proxySocket interface {
// while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service types.NamespacedName, info *serviceInfo, proxier *Proxier)
ProxyLoop(service ServicePortName, info *serviceInfo, proxier *Proxier)
}
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
@@ -68,10 +67,9 @@ type tcpProxySocket struct {
net.Listener
}
func tryConnect(service types.NamespacedName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
func tryConnect(service ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
for _, retryTimeout := range endpointDialTimeout {
// TODO: support multiple service ports
endpoint, err := proxier.loadBalancer.NextEndpoint(service, "", srcAddr)
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err
@@ -89,7 +87,7 @@ func tryConnect(service types.NamespacedName, srcAddr net.Addr, protocol string,
return nil, fmt.Errorf("failed to connect to an endpoint.")
}
func (tcp *tcpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *serviceInfo, proxier *Proxier) {
func (tcp *tcpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// The service port was closed or replaced.
@@ -164,7 +162,7 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
}
func (udp *udpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *serviceInfo, proxier *Proxier) {
func (udp *udpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
@@ -209,7 +207,7 @@ func (udp *udpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *servi
}
}
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service types.NamespacedName, timeout time.Duration) (net.Conn, error) {
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortName, timeout time.Duration) (net.Conn, error) {
activeClients.mu.Lock()
defer activeClients.mu.Unlock()
@@ -305,7 +303,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[types.NamespacedName]*serviceInfo
serviceMap map[ServicePortName]*serviceInfo
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
@@ -347,7 +345,7 @@ func CreateProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[types.NamespacedName]*serviceInfo),
serviceMap: make(map[ServicePortName]*serviceInfo),
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
@@ -387,35 +385,32 @@ func (proxier *Proxier) ensurePortals() {
// clean up any stale sticky session records in the hash map.
func (proxier *Proxier) cleanupStaleStickySessions() {
for name, info := range proxier.serviceMap {
if info.sessionAffinityType != api.AffinityTypeNone {
// TODO: support multiple service ports
proxier.loadBalancer.CleanupStaleStickySessions(name, "")
}
for name := range proxier.serviceMap {
proxier.loadBalancer.CleanupStaleStickySessions(name)
}
}
// This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service types.NamespacedName, info *serviceInfo) error {
func (proxier *Proxier) stopProxy(service ServicePortName, info *serviceInfo) error {
proxier.mu.Lock()
defer proxier.mu.Unlock()
return proxier.stopProxyInternal(service, info)
}
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service types.NamespacedName, info *serviceInfo) error {
func (proxier *Proxier) stopProxyInternal(service ServicePortName, info *serviceInfo) error {
delete(proxier.serviceMap, service)
return info.socket.Close()
}
func (proxier *Proxier) getServiceInfo(service types.NamespacedName) (*serviceInfo, bool) {
func (proxier *Proxier) getServiceInfo(service ServicePortName) (*serviceInfo, bool) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
info, ok := proxier.serviceMap[service]
return info, ok
}
func (proxier *Proxier) setServiceInfo(service types.NamespacedName, info *serviceInfo) {
func (proxier *Proxier) setServiceInfo(service ServicePortName, info *serviceInfo) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.serviceMap[service] = info
@@ -424,7 +419,7 @@ func (proxier *Proxier) setServiceInfo(service types.NamespacedName, info *servi
// addServiceOnPort starts listening for a new service, returning the serviceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service types.NamespacedName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
func (proxier *Proxier) addServiceOnPort(service ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil {
return nil, err
@@ -444,13 +439,13 @@ func (proxier *Proxier) addServiceOnPort(service types.NamespacedName, protocol
protocol: protocol,
socket: sock,
timeout: timeout,
sessionAffinityType: api.AffinityTypeNone,
stickyMaxAgeMinutes: 180,
sessionAffinityType: api.AffinityTypeNone, // default
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
}
proxier.setServiceInfo(service, si)
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service types.NamespacedName, proxier *Proxier) {
go func(service ServicePortName, proxier *Proxier) {
defer util.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
@@ -468,51 +463,57 @@ const udpIdleTimeout = 1 * time.Minute
// shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := make(map[types.NamespacedName]bool) // use a map as a set
for _, service := range services {
// if PortalIP is "None" or empty, skip proxying
if !api.IsServiceIPSet(&service) {
continue
}
serviceName := types.NamespacedName{service.Namespace, service.Name}
activeServices[serviceName] = true
info, exists := proxier.getServiceInfo(serviceName)
serviceIP := net.ParseIP(service.Spec.PortalIP)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) && ipsEqual(service.Spec.PublicIPs, info.publicIP) {
continue
}
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName.String())
err := proxier.closePortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
err = proxier.stopProxy(serviceName, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, service.Spec.Port, service.Spec.Protocol)
info, err := proxier.addServiceOnPort(serviceName, service.Spec.Protocol, 0, udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
}
info.portalIP = serviceIP
info.portalPort = service.Spec.Port
info.publicIP = service.Spec.PublicIPs
info.sessionAffinityType = service.Spec.SessionAffinity
// TODO: paramaterize this in the types api file as an attribute of sticky session. For now it's hardcoded to 3 hours.
info.stickyMaxAgeMinutes = 180
glog.V(4).Infof("info: %+v", info)
activeServices := make(map[ServicePortName]bool) // use a map as a set
for i := range services {
service := &services[i]
err = proxier.openPortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
// if PortalIP is "None" or empty, skip proxying
if !api.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to portal IP = %q", types.NamespacedName{service.Namespace, service.Name}, service.Spec.PortalIP)
continue
}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := ServicePortName{types.NamespacedName{service.Namespace, service.Name}, servicePort.Name}
activeServices[serviceName] = true
serviceIP := net.ParseIP(service.Spec.PortalIP)
info, exists := proxier.getServiceInfo(serviceName)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && sameConfig(info, service, servicePort) {
// Nothing changed.
continue
}
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
err := proxier.closePortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
err = proxier.stopProxy(serviceName, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info, err := proxier.addServiceOnPort(serviceName, servicePort.Protocol, 0, udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
}
info.portalIP = serviceIP
info.portalPort = servicePort.Port
info.publicIPs = service.Spec.PublicIPs
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(4).Infof("info: %+v", info)
err = proxier.openPortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
}
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
// TODO: support multiple service ports
proxier.loadBalancer.NewService(serviceName, "", info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
@@ -531,6 +532,22 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
}
}
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
if info.protocol != port.Protocol || info.portalPort != port.Port {
return false
}
if !info.portalIP.Equal(net.ParseIP(service.Spec.PortalIP)) {
return false
}
if !ipsEqual(info.publicIPs, service.Spec.PublicIPs) {
return false
}
if info.sessionAffinityType != service.Spec.SessionAffinity {
return false
}
return true
}
func ipsEqual(lhs, rhs []string) bool {
if len(lhs) != len(rhs) {
return false
@@ -543,12 +560,12 @@ func ipsEqual(lhs, rhs []string) bool {
return true
}
func (proxier *Proxier) openPortal(service types.NamespacedName, info *serviceInfo) error {
func (proxier *Proxier) openPortal(service ServicePortName, info *serviceInfo) error {
err := proxier.openOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
for _, publicIP := range info.publicIP {
for _, publicIP := range info.publicIPs {
err = proxier.openOnePortal(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
@@ -557,7 +574,7 @@ func (proxier *Proxier) openPortal(service types.NamespacedName, info *serviceIn
return nil
}
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name types.NamespacedName) error {
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error {
// Handle traffic from containers.
args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...)
@@ -582,10 +599,10 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
return nil
}
func (proxier *Proxier) closePortal(service types.NamespacedName, info *serviceInfo) error {
func (proxier *Proxier) closePortal(service ServicePortName, info *serviceInfo) error {
// Collect errors and report them all at the end.
el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
for _, publicIP := range info.publicIP {
for _, publicIP := range info.publicIPs {
el = append(el, proxier.closeOnePortal(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
}
if len(el) == 0 {
@@ -596,7 +613,7 @@ func (proxier *Proxier) closePortal(service types.NamespacedName, info *serviceI
return errors.NewAggregate(el)
}
func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name types.NamespacedName) []error {
func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) []error {
el := []error{}
// Handle traffic from containers.
@@ -675,7 +692,7 @@ var zeroIPv6 = net.ParseIP("::0")
var localhostIPv6 = net.ParseIP("::1")
// Build a slice of iptables args that are common to from-container and from-host portal rules.
func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service types.NamespacedName) []string {
func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service ServicePortName) []string {
// This list needs to include all fields as they are eventually spit out
// by iptables-save. This is because some systems do not support the
// 'iptables -C' arg, and so fall back on parsing iptables-save output.
@@ -696,7 +713,7 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol
}
// Build a slice of iptables args for a from-container portal rule.
func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service types.NamespacedName) []string {
func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string {
args := iptablesCommonPortalArgs(destIP, destPort, protocol, service)
// This is tricky.
@@ -743,7 +760,7 @@ func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int,
}
// Build a slice of iptables args for a from-host portal rule.
func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service types.NamespacedName) []string {
func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string {
args := iptablesCommonPortalArgs(destIP, destPort, protocol, service)
// This is tricky.

View File

@@ -195,13 +195,13 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
})
@@ -219,13 +219,13 @@ func TestTCPProxy(t *testing.T) {
func TestUDPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
})
@@ -241,8 +241,88 @@ func TestUDPProxy(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
}
func TestMultiPortProxy(t *testing.T) {
lb := NewLoadBalancerRR()
serviceP := ServicePortName{types.NamespacedName{"testnamespace", "echo-p"}, "p"}
serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "echo-q"}, "q"}
lb.OnUpdate([]api.Endpoints{{
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}},
}},
}, {
ObjectMeta: api.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}},
}},
}})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort)
waitForNumProxyLoops(t, p, 1)
svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoUDP(t, "127.0.0.1", svcInfoQ.proxyPort)
waitForNumProxyLoops(t, p, 2)
}
func TestMultiPortOnUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
serviceP := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"}
serviceX := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "x"}
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: api.ServiceSpec{PortalIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Port: 80,
Protocol: "TCP",
}, {
Name: "q",
Port: 81,
Protocol: "UDP",
}}},
}})
waitForNumProxyLoops(t, p, 2)
svcInfo, exists := p.getServiceInfo(serviceP)
if !exists {
t.Fatalf("can't find serviceInfo for %s", serviceP)
}
if svcInfo.portalIP.String() != "1.2.3.4" || svcInfo.portalPort != 80 || svcInfo.protocol != "TCP" {
t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo)
}
svcInfo, exists = p.getServiceInfo(serviceQ)
if !exists {
t.Fatalf("can't find serviceInfo for %s", serviceQ)
}
if svcInfo.portalIP.String() != "1.2.3.4" || svcInfo.portalPort != 81 || svcInfo.protocol != "UDP" {
t.Errorf("unexpected serviceInfo for %s: %#v", serviceQ, svcInfo)
}
svcInfo, exists = p.getServiceInfo(serviceX)
if exists {
t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo)
}
}
// Helper: Stops the proxy for the named service.
func stopProxyByName(proxier *Proxier, service types.NamespacedName) error {
func stopProxyByName(proxier *Proxier, service ServicePortName) error {
info, found := proxier.getServiceInfo(service)
if !found {
return fmt.Errorf("unknown service: %s", service)
@@ -252,13 +332,13 @@ func stopProxyByName(proxier *Proxier, service types.NamespacedName) error {
func TestTCPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
})
@@ -287,13 +367,13 @@ func TestTCPProxyStop(t *testing.T) {
func TestUDPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
})
@@ -322,13 +402,13 @@ func TestUDPProxyStop(t *testing.T) {
func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
})
@@ -356,13 +436,13 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
func TestUDPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
})
@@ -390,13 +470,13 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
})
@@ -420,9 +500,15 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
})
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{PortalIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Port: svcInfo.proxyPort,
Protocol: "TCP",
}}},
}})
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo for %s", service)
@@ -433,13 +519,13 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
})
@@ -463,9 +549,15 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
})
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{PortalIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Port: svcInfo.proxyPort,
Protocol: "UDP",
}}},
}})
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
@@ -476,13 +568,13 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
func TestTCPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
})
@@ -497,9 +589,14 @@ func TestTCPProxyUpdatePort(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "TCP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
})
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{PortalIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Port: 99,
Protocol: "TCP",
}}},
}})
// Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
@@ -516,13 +613,13 @@ func TestTCPProxyUpdatePort(t *testing.T) {
func TestUDPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
})
@@ -536,9 +633,14 @@ func TestUDPProxyUpdatePort(t *testing.T) {
}
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "UDP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
})
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{PortalIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Port: 99,
Protocol: "UDP",
}}},
}})
// Wait for the socket to actually get free.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
@@ -553,13 +655,13 @@ func TestUDPProxyUpdatePort(t *testing.T) {
func TestProxyUpdatePublicIPs(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
})
@@ -574,9 +676,18 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.portalPort, Protocol: "TCP", PortalIP: svcInfo.portalIP.String(), PublicIPs: []string{"4.3.2.1"}}, Status: api.ServiceStatus{}},
})
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Name: "p",
Port: svcInfo.portalPort,
Protocol: "TCP",
}},
PortalIP: svcInfo.portalIP.String(),
PublicIPs: []string{"4.3.2.1"},
},
}})
// Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
@@ -593,13 +704,13 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
func TestProxyUpdatePortal(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
})
@@ -614,33 +725,40 @@ func TestProxyUpdatePortal(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP"}, Status: api.ServiceStatus{}},
})
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{PortalIP: "", Ports: []api.ServicePort{{
Name: "p",
Port: svcInfo.proxyPort,
Protocol: "TCP",
}}},
}})
_, exists := p.getServiceInfo(service)
if exists {
t.Fatalf("service without portalIP should not be included in the proxy")
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", PortalIP: ""}, Status: api.ServiceStatus{}},
})
_, exists = p.getServiceInfo(service)
if exists {
t.Fatalf("service with empty portalIP should not be included in the proxy")
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", PortalIP: "None"}, Status: api.ServiceStatus{}},
})
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{PortalIP: "None", Ports: []api.ServicePort{{
Name: "p",
Port: svcInfo.proxyPort,
Protocol: "TCP",
}}},
}})
_, exists = p.getServiceInfo(service)
if exists {
t.Fatalf("service with 'None' as portalIP should not be included in the proxy")
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
})
p.OnUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{PortalIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Port: svcInfo.proxyPort,
Protocol: "TCP",
}}},
}})
svcInfo, exists = p.getServiceInfo(service)
if !exists {
t.Fatalf("service with portalIP set not found in the proxy")

View File

@@ -50,20 +50,10 @@ type affinityPolicy struct {
ttlMinutes int
}
// servicePort is the type that the balancer uses to key stored state.
type servicePort struct {
types.NamespacedName
port string
}
func (sp servicePort) String() string {
return fmt.Sprintf("%s:%s", sp.NamespacedName, sp.port)
}
// LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct {
lock sync.RWMutex
services map[servicePort]*balancerState
services map[ServicePortName]*balancerState
}
// Ensure this implements LoadBalancer.
@@ -86,13 +76,11 @@ func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityP
// NewLoadBalancerRR returns a new LoadBalancerRR.
func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{
services: map[servicePort]*balancerState{},
services: map[ServicePortName]*balancerState{},
}
}
func (lb *LoadBalancerRR) NewService(service types.NamespacedName, port string, affinityType api.AffinityType, ttlMinutes int) error {
svcPort := servicePort{service, port}
func (lb *LoadBalancerRR) NewService(svcPort ServicePortName, affinityType api.AffinityType, ttlMinutes int) error {
lb.lock.Lock()
defer lb.lock.Unlock()
lb.newServiceInternal(svcPort, affinityType, ttlMinutes)
@@ -100,7 +88,7 @@ func (lb *LoadBalancerRR) NewService(service types.NamespacedName, port string,
}
// This assumes that lb.lock is already held.
func (lb *LoadBalancerRR) newServiceInternal(svcPort servicePort, affinityType api.AffinityType, ttlMinutes int) *balancerState {
func (lb *LoadBalancerRR) newServiceInternal(svcPort ServicePortName, affinityType api.AffinityType, ttlMinutes int) *balancerState {
if ttlMinutes == 0 {
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
}
@@ -123,9 +111,7 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service types.NamespacedName, port string, srcAddr net.Addr) (string, error) {
svcPort := servicePort{service, port}
func (lb *LoadBalancerRR) NextEndpoint(svcPort ServicePortName, srcAddr net.Addr) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
@@ -202,7 +188,7 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string {
}
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(state *balancerState, svcPort servicePort, endpoint string) {
func removeSessionAffinityByEndpoint(state *balancerState, svcPort ServicePortName, endpoint string) {
for _, affinity := range state.affinity.affinityMap {
if affinity.endpoint == endpoint {
glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort)
@@ -214,7 +200,7 @@ func removeSessionAffinityByEndpoint(state *balancerState, svcPort servicePort,
// Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
// Then remove any session affinity records that are not in both lists.
// This assumes the lb.lock is held.
func (lb *LoadBalancerRR) updateAffinityMap(svcPort servicePort, newEndpoints []string) {
func (lb *LoadBalancerRR) updateAffinityMap(svcPort ServicePortName, newEndpoints []string) {
allEndpoints := map[string]int{}
for _, newEndpoint := range newEndpoints {
allEndpoints[newEndpoint] = 1
@@ -238,7 +224,7 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort servicePort, newEndpoints []
// Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[servicePort]bool)
registeredEndpoints := make(map[ServicePortName]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
@@ -262,7 +248,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
}
for portname := range portsToEndpoints {
svcPort := servicePort{types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name}, portname}
svcPort := ServicePortName{types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name}, portname}
state, exists := lb.services[svcPort]
curEndpoints := []string{}
if state != nil {
@@ -305,15 +291,12 @@ func slicesEquiv(lhs, rhs []string) bool {
return false
}
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service types.NamespacedName, port string) {
svcPort := servicePort{service, port}
func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort ServicePortName) {
lb.lock.Lock()
defer lb.lock.Unlock()
state, exists := lb.services[svcPort]
if !exists {
glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", svcPort)
return
}
for ip, affinity := range state.affinity.affinityMap {

View File

@@ -67,8 +67,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
var endpoints []api.Endpoints
loadBalancer.OnUpdate(endpoints)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "does-not-exist", nil)
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "does-not-exist"}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil {
t.Errorf("Didn't fail with non-existent service")
}
@@ -77,20 +77,20 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
}
}
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service types.NamespacedName, port string, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, port, netaddr)
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service ServicePortName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr)
if err != nil {
t.Errorf("Didn't find a service for %s:%s, expected %s, failed with: %v", service, port, expected, err)
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
}
if endpoint != expected {
t.Errorf("Didn't get expected endpoint for service %s:%s client %v, expected %s, got: %s", service, port, netaddr, expected, endpoint)
t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
}
}
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "p", nil)
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -103,10 +103,10 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, service, "p", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "p", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "p", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "p", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
}
func stringsInSlice(haystack []string, needles ...string) bool {
@@ -127,8 +127,8 @@ func stringsInSlice(haystack []string, needles ...string) bool {
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "p", nil)
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -142,26 +142,27 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[servicePort{service, "p"}].endpoints
shuffledEndpoints := loadBalancer.services[service].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "p", nil)
serviceP := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"}
serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "q"}
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}},
@@ -175,35 +176,36 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[servicePort{service, "p"}].endpoints
shuffledEndpoints := loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
shuffledEndpoints = loadBalancer.services[servicePort{service, "q"}].endpoints
shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:2", "endpoint2:2", "endpoint3:4") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "p", nil)
serviceP := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"}
serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "q"}
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint1"}},
@@ -221,28 +223,28 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[servicePort{service, "p"}].endpoints
shuffledEndpoints := loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
shuffledEndpoints = loadBalancer.services[servicePort{service, "q"}].endpoints
shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:10", "endpoint2:20", "endpoint3:30") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint4"}},
@@ -256,29 +258,29 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[servicePort{service, "p"}].endpoints
shuffledEndpoints = loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.services[servicePort{service, "q"}].endpoints
shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint4:40", "endpoint5:50") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(service, "p", nil)
endpoint, err = loadBalancer.NextEndpoint(serviceP, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -286,15 +288,15 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
fooService := types.NewNamespacedNameOrDie("testnamespace", "foo")
barService := types.NewNamespacedNameOrDie("testnamespace", "bar")
endpoint, err := loadBalancer.NextEndpoint(fooService, "p", nil)
fooServiceP := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"}
barServiceP := ServicePortName{types.NamespacedName{"testnamespace", "bar"}, "p"}
endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
ObjectMeta: api.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}, {IP: "endpoint3"}},
@@ -303,7 +305,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
}
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
ObjectMeta: api.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint4"}, {IP: "endpoint5"}, {IP: "endpoint6"}},
@@ -312,54 +314,54 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services[servicePort{fooService, "p"}].endpoints
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[1], nil)
shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.services[servicePort{barService, "p"}].endpoints
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[2], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.services[barServiceP].endpoints
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooService, "p", nil)
endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[2], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[2], nil)
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
}
func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, "", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "endpoint"}}, Ports: []api.EndpointPort{{Port: 1}}}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, service, "", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, service, "", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
@@ -367,13 +369,13 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, "", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
@@ -385,15 +387,15 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
@@ -401,13 +403,13 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, "", api.AffinityTypeNone, 0)
loadBalancer.NewService(service, api.AffinityTypeNone, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
@@ -420,15 +422,15 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client1)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
@@ -439,13 +441,13 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, "", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
@@ -457,14 +459,14 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0]
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
client2Endpoint := shuffledEndpoints[1]
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
client3Endpoint := shuffledEndpoints[2]
endpoints[0] = api.Endpoints{
@@ -477,7 +479,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[servicePort{service, ""}].endpoints
shuffledEndpoints = loadBalancer.services[service].endpoints
if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0]
} else if client2Endpoint == "endpoint:3" {
@@ -485,9 +487,9 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
} else if client3Endpoint == "endpoint:3" {
client3Endpoint = shuffledEndpoints[0]
}
expectEndpoint(t, loadBalancer, service, "", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, "", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, "", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
@@ -499,13 +501,13 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, "", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, "", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client6)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6)
}
func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
@@ -513,13 +515,13 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, "", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
@@ -531,14 +533,14 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{
@@ -551,19 +553,19 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(service, "", nil)
endpoint, err = loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -574,12 +576,12 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
fooService := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(fooService, "", nil)
fooService := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
endpoint, err := loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(fooService, "", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(fooService, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
@@ -590,8 +592,8 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
},
}
barService := types.NewNamespacedNameOrDie("testnamespace", "bar")
loadBalancer.NewService(barService, "", api.AffinityTypeClientIP, 0)
barService := ServicePortName{types.NamespacedName{"testnamespace", "bar"}, ""}
loadBalancer.NewService(barService, api.AffinityTypeClientIP, 0)
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
Subsets: []api.EndpointSubset{
@@ -603,38 +605,38 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services[servicePort{fooService, ""}].endpoints
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[2], client3)
shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
shuffledBarEndpoints := loadBalancer.services[servicePort{barService, ""}].endpoints
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooService, "", nil)
endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
shuffledBarEndpoints = loadBalancer.services[servicePort{barService, ""}].endpoints
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
shuffledBarEndpoints = loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
}