diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go index f32f05fc8a1..e68bedcc933 100644 --- a/pkg/proxy/userspace/loadbalancer.go +++ b/pkg/proxy/userspace/loadbalancer.go @@ -27,7 +27,7 @@ import ( type LoadBalancer interface { // NextEndpoint returns the endpoint to handle a request for the given // service-port and source address. - NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr) (string, error) + NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error CleanupStaleStickySessions(service proxy.ServicePortName) } diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index 97f42cc73e6..3cab4444d28 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -87,8 +87,9 @@ func (tcp *tcpProxySocket) ListenPort() int { } func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { + sessionAffinityReset := false for _, dialTimeout := range endpointDialTimeout { - endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) + endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err @@ -102,6 +103,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string panic("Dial failed: " + err.Error()) } glog.Errorf("Dial failed: %v", err) + sessionAffinityReset = true continue } return outConn, nil diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index 86a93e25618..55021dc0b80 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -114,7 +114,7 @@ func isSessionAffinity(affinity *affinityPolicy) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. -func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr) (string, error) { +func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) { // Coarse locking is simple. We can get more fine-grained if/when we // can prove it matters. lb.lock.Lock() @@ -139,13 +139,15 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne if err != nil { return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) } - sessionAffinity, exists := state.affinity.affinityMap[ipaddr] - if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { - // Affinity wins. - endpoint := sessionAffinity.endpoint - sessionAffinity.lastUsed = time.Now() - glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", svcPort, ipaddr, sessionAffinity, endpoint) - return endpoint, nil + if !sessionAffinityReset { + sessionAffinity, exists := state.affinity.affinityMap[ipaddr] + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { + // Affinity wins. + endpoint := sessionAffinity.endpoint + sessionAffinity.lastUsed = time.Now() + glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", svcPort, ipaddr, sessionAffinity, endpoint) + return endpoint, nil + } } } // Take the next endpoint. diff --git a/pkg/proxy/userspace/roundrobin_test.go b/pkg/proxy/userspace/roundrobin_test.go index d04bb73afe1..9587cb71a1d 100644 --- a/pkg/proxy/userspace/roundrobin_test.go +++ b/pkg/proxy/userspace/roundrobin_test.go @@ -69,7 +69,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { var endpoints []api.Endpoints loadBalancer.OnEndpointsUpdate(endpoints) service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} - endpoint, err := loadBalancer.NextEndpoint(service, nil) + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil { t.Errorf("Didn't fail with non-existent service") } @@ -79,7 +79,17 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { } func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) { - endpoint, err := loadBalancer.NextEndpoint(service, netaddr) + endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false) + if err != nil { + t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) + } + if endpoint != expected { + t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint) + } +} + +func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) { + endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true) if err != nil { t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) } @@ -91,7 +101,7 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.Se func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { loadBalancer := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} - endpoint, err := loadBalancer.NextEndpoint(service, nil) + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -129,7 +139,7 @@ func stringsInSlice(haystack []string, needles ...string) bool { func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} - endpoint, err := loadBalancer.NextEndpoint(service, nil) + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -157,7 +167,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { loadBalancer := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"} - endpoint, err := loadBalancer.NextEndpoint(serviceP, nil) + endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -200,7 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { loadBalancer := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"} - endpoint, err := loadBalancer.NextEndpoint(serviceP, nil) + endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -281,7 +291,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} loadBalancer.OnEndpointsUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint(serviceP, nil) + endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -291,7 +301,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { loadBalancer := NewLoadBalancerRR() fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"} - endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil) + endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -331,7 +341,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { // Then update the configuration by removing foo loadBalancer.OnEndpointsUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil) + endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -346,7 +356,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { loadBalancer := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} - endpoint, err := loadBalancer.NextEndpoint(service, nil) + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -368,7 +378,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} - ep1, err := loadBalancer.NextEndpoint(service, client1) + ep1, err := loadBalancer.NextEndpoint(service, client1, false) if err != nil { t.Errorf("Didn't find a service for %s: %v", service, err) } @@ -376,7 +386,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1) - ep2, err := loadBalancer.NextEndpoint(service, client2) + ep2, err := loadBalancer.NextEndpoint(service, client2, false) if err != nil { t.Errorf("Didn't find a service for %s: %v", service, err) } @@ -384,7 +394,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2) - ep3, err := loadBalancer.NextEndpoint(service, client3) + ep3, err := loadBalancer.NextEndpoint(service, client3, false) if err != nil { t.Errorf("Didn't find a service for %s: %v", service, err) } @@ -403,7 +413,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { loadBalancer := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} - endpoint, err := loadBalancer.NextEndpoint(service, nil) + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -424,7 +434,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} - ep1, err := loadBalancer.NextEndpoint(service, client1) + ep1, err := loadBalancer.NextEndpoint(service, client1, false) if err != nil { t.Errorf("Didn't find a service for %s: %v", service, err) } @@ -432,7 +442,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { expectEndpoint(t, loadBalancer, service, ep1, client1) expectEndpoint(t, loadBalancer, service, ep1, client1) - ep2, err := loadBalancer.NextEndpoint(service, client2) + ep2, err := loadBalancer.NextEndpoint(service, client2, false) if err != nil { t.Errorf("Didn't find a service for %s: %v", service, err) } @@ -440,7 +450,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { expectEndpoint(t, loadBalancer, service, ep2, client2) expectEndpoint(t, loadBalancer, service, ep2, client2) - ep3, err := loadBalancer.NextEndpoint(service, client3) + ep3, err := loadBalancer.NextEndpoint(service, client3, false) if err != nil { t.Errorf("Didn't find a service for %s: %v", service, err) } @@ -465,7 +475,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} loadBalancer := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} - endpoint, err := loadBalancer.NextEndpoint(service, nil) + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -539,7 +549,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} - endpoint, err := loadBalancer.NextEndpoint(service, nil) + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -588,7 +598,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} loadBalancer.OnEndpointsUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint(service, nil) + endpoint, err = loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -600,7 +610,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} - endpoint, err := loadBalancer.NextEndpoint(fooService, nil) + endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -649,7 +659,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { // Then update the configuration by removing foo loadBalancer.OnEndpointsUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint(fooService, nil) + endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -663,3 +673,55 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) } + +func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + // Call NewService() before OnEndpointsUpdate() + loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + + ep1, err := loadBalancer.NextEndpoint(service, client1, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + + ep2, err := loadBalancer.NextEndpoint(service, client2, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + + ep3, err := loadBalancer.NextEndpoint(service, client3, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1) + + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3) +}