Squashed commit of the following:

commit 7bf1a05f61b78196c8d272e0d55980ba2254e81d
Author: gaozheng <gaozheng0123@163.com>
Date:   Thu Apr 28 01:23:42 2016 +0000

    fix gofmt

commit 54f6fa6ca76ee0fc5c4f8609fb2f875111ce2141
Author: Gao Zheng <gaozheng0123@163.com>
Date:   Sat Apr 23 13:09:41 2016 +0000

    reset session affinity if endpoint is unconnected
This commit is contained in:
Gao Zheng 2016-05-03 01:36:32 +00:00 committed by gaozheng
parent 3a4f179c75
commit c75cb94be6
4 changed files with 98 additions and 32 deletions

View File

@ -27,7 +27,7 @@ import (
type LoadBalancer interface { type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given // NextEndpoint returns the endpoint to handle a request for the given
// service-port and source address. // service-port and source address.
NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr) (string, error) NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName)
} }

View File

@ -87,8 +87,9 @@ func (tcp *tcpProxySocket) ListenPort() int {
} }
func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
sessionAffinityReset := false
for _, dialTimeout := range endpointDialTimeout { for _, dialTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
if err != nil { if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err return nil, err
@ -102,6 +103,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string
panic("Dial failed: " + err.Error()) panic("Dial failed: " + err.Error())
} }
glog.Errorf("Dial failed: %v", err) glog.Errorf("Dial failed: %v", err)
sessionAffinityReset = true
continue continue
} }
return outConn, nil return outConn, nil

View File

@ -114,7 +114,7 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint. // NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm. // The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr) (string, error) { func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we // Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters. // can prove it matters.
lb.lock.Lock() lb.lock.Lock()
@ -139,6 +139,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
if err != nil { if err != nil {
return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err)
} }
if !sessionAffinityReset {
sessionAffinity, exists := state.affinity.affinityMap[ipaddr] sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
// Affinity wins. // Affinity wins.
@ -148,6 +149,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
return endpoint, nil return endpoint, nil
} }
} }
}
// Take the next endpoint. // Take the next endpoint.
endpoint := state.endpoints[state.index] endpoint := state.endpoints[state.index]
state.index = (state.index + 1) % len(state.endpoints) state.index = (state.index + 1) % len(state.endpoints)

View File

@ -69,7 +69,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
var endpoints []api.Endpoints var endpoints []api.Endpoints
loadBalancer.OnEndpointsUpdate(endpoints) loadBalancer.OnEndpointsUpdate(endpoints)
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil { if err == nil {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -79,7 +79,17 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
} }
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) { func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr) endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false)
if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
}
if endpoint != expected {
t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
}
}
func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
} }
@ -91,7 +101,7 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.Se
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -129,7 +139,7 @@ func stringsInSlice(haystack []string, needles ...string) bool {
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -157,7 +167,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil) endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -200,7 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil) endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -281,7 +291,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpoints) loadBalancer.OnEndpointsUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(serviceP, nil) endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -291,7 +301,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"} barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil) endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -331,7 +341,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
// Then update the configuration by removing foo // Then update the configuration by removing foo
loadBalancer.OnEndpointsUpdate(endpoints[1:]) loadBalancer.OnEndpointsUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil) endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -346,7 +356,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -368,7 +378,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
ep1, err := loadBalancer.NextEndpoint(service, client1) ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -376,7 +386,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1)
expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1)
ep2, err := loadBalancer.NextEndpoint(service, client2) ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -384,7 +394,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2)
ep3, err := loadBalancer.NextEndpoint(service, client3) ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -403,7 +413,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -424,7 +434,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
ep1, err := loadBalancer.NextEndpoint(service, client1) ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -432,7 +442,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1)
expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1)
ep2, err := loadBalancer.NextEndpoint(service, client2) ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -440,7 +450,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2)
ep3, err := loadBalancer.NextEndpoint(service, client3) ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -465,7 +475,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -539,7 +549,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -588,7 +598,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpoints) loadBalancer.OnEndpointsUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(service, nil) endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -600,7 +610,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(fooService, nil) endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -649,7 +659,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
// Then update the configuration by removing foo // Then update the configuration by removing foo
loadBalancer.OnEndpointsUpdate(endpoints[1:]) loadBalancer.OnEndpointsUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooService, nil) endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -663,3 +673,55 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
} }
func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// Call NewService() before OnEndpointsUpdate()
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
{Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
}