Merge pull request #24655 from njuicsgz/master

Automatic merge from submit-queue

kube-proxy userspace, enable RR if connect endpoint which is session affinity fails

for issue #24648
This commit is contained in:
k8s-merge-robot 2016-05-08 23:04:23 -07:00
commit cd3ab5d82d
4 changed files with 98 additions and 32 deletions

View File

@ -27,7 +27,7 @@ import (
type LoadBalancer interface { type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given // NextEndpoint returns the endpoint to handle a request for the given
// service-port and source address. // service-port and source address.
NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr) (string, error) NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName)
} }

View File

@ -87,8 +87,9 @@ func (tcp *tcpProxySocket) ListenPort() int {
} }
func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
sessionAffinityReset := false
for _, dialTimeout := range endpointDialTimeout { for _, dialTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
if err != nil { if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err return nil, err
@ -102,6 +103,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string
panic("Dial failed: " + err.Error()) panic("Dial failed: " + err.Error())
} }
glog.Errorf("Dial failed: %v", err) glog.Errorf("Dial failed: %v", err)
sessionAffinityReset = true
continue continue
} }
return outConn, nil return outConn, nil

View File

@ -114,7 +114,7 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint. // NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm. // The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr) (string, error) { func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we // Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters. // can prove it matters.
lb.lock.Lock() lb.lock.Lock()
@ -139,6 +139,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
if err != nil { if err != nil {
return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err)
} }
if !sessionAffinityReset {
sessionAffinity, exists := state.affinity.affinityMap[ipaddr] sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
// Affinity wins. // Affinity wins.
@ -148,6 +149,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
return endpoint, nil return endpoint, nil
} }
} }
}
// Take the next endpoint. // Take the next endpoint.
endpoint := state.endpoints[state.index] endpoint := state.endpoints[state.index]
state.index = (state.index + 1) % len(state.endpoints) state.index = (state.index + 1) % len(state.endpoints)

View File

@ -69,7 +69,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
var endpoints []api.Endpoints var endpoints []api.Endpoints
loadBalancer.OnEndpointsUpdate(endpoints) loadBalancer.OnEndpointsUpdate(endpoints)
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil { if err == nil {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -79,7 +79,17 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
} }
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) { func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr) endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false)
if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
}
if endpoint != expected {
t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
}
}
func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
} }
@ -91,7 +101,7 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.Se
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -129,7 +139,7 @@ func stringsInSlice(haystack []string, needles ...string) bool {
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -157,7 +167,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil) endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -200,7 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil) endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -281,7 +291,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpoints) loadBalancer.OnEndpointsUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(serviceP, nil) endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -291,7 +301,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"} barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil) endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -331,7 +341,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
// Then update the configuration by removing foo // Then update the configuration by removing foo
loadBalancer.OnEndpointsUpdate(endpoints[1:]) loadBalancer.OnEndpointsUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil) endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -346,7 +356,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -368,7 +378,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
ep1, err := loadBalancer.NextEndpoint(service, client1) ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -376,7 +386,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1)
expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1)
ep2, err := loadBalancer.NextEndpoint(service, client2) ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -384,7 +394,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2)
ep3, err := loadBalancer.NextEndpoint(service, client3) ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -403,7 +413,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -424,7 +434,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
ep1, err := loadBalancer.NextEndpoint(service, client1) ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -432,7 +442,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1)
expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1)
ep2, err := loadBalancer.NextEndpoint(service, client2) ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -440,7 +450,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2)
ep3, err := loadBalancer.NextEndpoint(service, client3) ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err) t.Errorf("Didn't find a service for %s: %v", service, err)
} }
@ -465,7 +475,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -539,7 +549,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil) endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -588,7 +598,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpoints) loadBalancer.OnEndpointsUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(service, nil) endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -600,7 +610,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(fooService, nil) endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -649,7 +659,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
// Then update the configuration by removing foo // Then update the configuration by removing foo
loadBalancer.OnEndpointsUpdate(endpoints[1:]) loadBalancer.OnEndpointsUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooService, nil) endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -663,3 +673,55 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
} }
func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// Call NewService() before OnEndpointsUpdate()
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
{Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
}