diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 292e3ea997e..1c6e7c427f1 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -96,6 +96,8 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort ServicePortName, affinityTy if _, exists := lb.services[svcPort]; !exists { lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort) + } else if affinityType != "" { + lb.services[svcPort].affinity.affinityType = affinityType } return lb.services[svcPort] } @@ -261,8 +263,9 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { lb.updateAffinityMap(svcPort, newEndpoints) // OnUpdate can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created - // if one does not already exist. - state = lb.newServiceInternal(svcPort, api.ServiceAffinityNone, 0) + // if one does not already exist. The affinity will be updated + // later, once NewService is called. + state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) state.endpoints = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index e03e4a88ca5..ca549b51d01 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -342,32 +342,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil) } -func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) { - client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} - client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} - loadBalancer := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} - endpoint, err := loadBalancer.NextEndpoint(service, nil) - if err == nil || len(endpoint) != 0 { - t.Errorf("Didn't fail with non-existent service") - } - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "endpoint"}}, Ports: []api.EndpointPort{{Port: 1}}}}, - } - loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, service, "endpoint:1", client1) - expectEndpoint(t, loadBalancer, service, "endpoint:1", client1) - expectEndpoint(t, loadBalancer, service, "endpoint:1", client2) - expectEndpoint(t, loadBalancer, service, "endpoint:1", client2) -} - -func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { - client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} - client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} - client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} +func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { loadBalancer := NewLoadBalancerRR() service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} endpoint, err := loadBalancer.NextEndpoint(service, nil) @@ -375,33 +350,56 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } + // Call NewService() before OnUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{IP: "endpoint"}}, - Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}}, - }, + {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services[service].endpoints - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) -} -func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + + ep1, err := loadBalancer.NextEndpoint(service, client1) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep1, client1) + + ep2, err := loadBalancer.NextEndpoint(service, client2) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep2, client2) + + ep3, err := loadBalancer.NextEndpoint(service, client3) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep3, client3) + + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep3, client3) +} + +func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { loadBalancer := NewLoadBalancerRR() service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} endpoint, err := loadBalancer.NextEndpoint(service, nil) @@ -409,28 +407,52 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService(service, api.ServiceAffinityNone, 0) + // Call OnUpdate() before NewService() endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{IP: "endpoint"}}, - Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}}, - }, + {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, }, } loadBalancer.OnUpdate(endpoints) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - shuffledEndpoints := loadBalancer.services[service].endpoints - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client2) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client2) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client3) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1) + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + + ep1, err := loadBalancer.NextEndpoint(service, client1) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep1, client1) + + ep2, err := loadBalancer.NextEndpoint(service, client2) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep2, client2) + + ep3, err := loadBalancer.NextEndpoint(service, client3) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep3, client3) + + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep3, client3) } func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {