diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 6d486943f58..f4615be7287 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -220,7 +220,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err var proxier proxy.ProxyProvider var servicesHandler proxyconfig.ServiceConfigHandler + // TODO: Migrate all handlers to EndpointsHandler type and + // get rid of this one. var endpointsHandler proxyconfig.EndpointsConfigHandler + var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) if proxyMode == proxyModeIPTables { @@ -247,7 +250,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err } proxier = proxierIPTables servicesHandler = proxierIPTables - endpointsHandler = proxierIPTables + endpointsEventHandler = proxierIPTables // No turning back. Remove artifacts that might still exist from the userspace Proxier. glog.V(0).Info("Tearing down userspace rules.") userspace.CleanupLeftovers(iptInterface) @@ -257,7 +260,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // our config.EndpointsConfigHandler. loadBalancer := winuserspace.NewLoadBalancerRR() - // set EndpointsConfigHandler to our loadBalancer + // set EndpointsHandler to our loadBalancer endpointsHandler = loadBalancer proxierUserspace, err := winuserspace.NewProxier( loadBalancer, @@ -278,7 +281,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // our config.EndpointsConfigHandler. loadBalancer := userspace.NewLoadBalancerRR() // set EndpointsConfigHandler to our loadBalancer - endpointsHandler = loadBalancer + endpointsEventHandler = loadBalancer proxierUserspace, err := userspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), @@ -318,7 +321,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod) - endpointsConfig.RegisterHandler(endpointsHandler) + if endpointsHandler != nil { + endpointsConfig.RegisterHandler(endpointsHandler) + } + if endpointsEventHandler != nil { + endpointsConfig.RegisterEventHandler(endpointsEventHandler) + } go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 375c26fa488..73bfae3caf8 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -60,12 +60,31 @@ type EndpointsConfigHandler interface { OnEndpointsUpdate(endpoints []*api.Endpoints) } +// EndpointsHandler is an abstract interface o objects which receive +// notifications about endpoints object changes. +type EndpointsHandler interface { + // OnEndpointsAdd is called whenever creation of new endpoints object + // is observed. + OnEndpointsAdd(endpoints *api.Endpoints) + // OnEndpointsUpdate is called whenever modification of an existing + // endpoints object is observed. + OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) + // OnEndpointsDelete is called whever deletion of an existing endpoints + // object is observed. + OnEndpointsDelete(endpoints *api.Endpoints) + // OnEndpointsSynced is called once all the initial event handlers were + // called and the state is fully propagated to local cache. + OnEndpointsSynced() +} + // EndpointsConfig tracks a set of endpoints configurations. // It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change. type EndpointsConfig struct { - lister listers.EndpointsLister - listerSynced cache.InformerSynced - handlers []EndpointsConfigHandler + lister listers.EndpointsLister + listerSynced cache.InformerSynced + eventHandlers []EndpointsHandler + // TODO: Remove handlers by switching them to eventHandlers. + handlers []EndpointsConfigHandler // updates channel is used to trigger registered handlers. updates chan struct{} stop chan struct{} @@ -101,6 +120,11 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.handlers = append(c.handlers, handler) } +// RegisterEventHandler registers a handler which is called on every endpoints change. +func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + // Run starts the goroutine responsible for calling registered handlers. func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { if !cache.WaitForCacheSync(stopCh, c.listerSynced) { @@ -111,6 +135,10 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { // We have synced informers. Now we can start delivering updates // to the registered handler. go func() { + for i := range c.eventHandlers { + glog.V(3).Infof("Calling handler.OnEndpointsSynced()") + c.eventHandlers[i].OnEndpointsSynced() + } for { select { case <-c.updates: @@ -140,15 +168,54 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { }() } -func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) { +func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) { + endpoints, ok := obj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnEndpointsAdd") + c.eventHandlers[i].OnEndpointsAdd(endpoints) + } c.dispatchUpdate() } -func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) { +func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) { + oldEndpoints, ok := oldObj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj)) + return + } + endpoints, ok := newObj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) + return + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnEndpointsUpdate") + c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints) + } c.dispatchUpdate() } -func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) { +func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { + endpoints, ok := obj.(*api.Endpoints) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + if endpoints, ok = tombstone.Obj.(*api.Endpoints); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnEndpointsUpdate") + c.eventHandlers[i].OnEndpointsDelete(endpoints) + } c.dispatchUpdate() } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index cb3edbae595..62d8af048fa 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -196,8 +196,8 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se return info } +type endpointsMap map[types.NamespacedName]*api.Endpoints type proxyServiceMap map[proxy.ServicePortName]*serviceInfo - type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo // Proxier is an iptables based proxy for connections between a localhost:lport @@ -211,9 +211,14 @@ type Proxier struct { // pointers are shared with higher layers of kube-proxy. They are guaranteed // to not be modified in the meantime, but also require to be not modified // by Proxier. - // nil until we have seen an On*Update event. - allServices []*api.Service - allEndpoints []*api.Endpoints + allEndpoints endpointsMap + // allServices is nil until we have seen an OnServiceUpdate event. + allServices []*api.Service + + // endpointsSynced is set to true when endpoints are synced after startup. + // This is used to avoid updating iptables with some partial data after + // kube-proxy restart. + endpointsSynced bool throttle flowcontrol.RateLimiter @@ -328,6 +333,7 @@ func NewProxier(ipt utiliptables.Interface, serviceMap: make(proxyServiceMap), endpointsMap: make(proxyEndpointMap), portsMap: make(map[localPort]closeable), + allEndpoints: make(endpointsMap), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, throttle: throttle, @@ -532,19 +538,42 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) { proxier.syncProxyRules(syncReasonServices) } -// OnEndpointsUpdate takes in a slice of updated endpoints. -func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { +func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + proxier.mu.Lock() defer proxier.mu.Unlock() - if proxier.allEndpoints == nil { - glog.V(2).Info("Received first Endpoints update") - } - proxier.allEndpoints = allEndpoints + proxier.allEndpoints[namespacedName] = endpoints + proxier.syncProxyRules(syncReasonEndpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(_, endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.allEndpoints[namespacedName] = endpoints + proxier.syncProxyRules(syncReasonEndpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + + proxier.mu.Lock() + defer proxier.mu.Unlock() + delete(proxier.allEndpoints, namespacedName) + proxier.syncProxyRules(syncReasonEndpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.endpointsSynced = true proxier.syncProxyRules(syncReasonEndpoints) } // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. -func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { +func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { // return values newMap = make(proxyEndpointMap) @@ -552,8 +581,8 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap staleSet = make(map[endpointServicePair]bool) // Update endpoints for services. - for i := range allEndpoints { - accumulateEndpointsMap(allEndpoints[i], hostname, &newMap) + for _, endpoints := range allEndpoints { + accumulateEndpointsMap(endpoints, hostname, &newMap) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP @@ -608,7 +637,6 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap // NOTE: endpoints object should NOT be modified. // // TODO: this could be simplified: -// - hostPortInfo and endpointsInfo overlap too much // - the test for this is overlapped by the test for buildNewEndpointsMap // - naming is poor and responsibilities are muddled func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) { @@ -733,7 +761,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start)) }() // don't sync rules till we've received services and endpoints - if proxier.allEndpoints == nil || proxier.allServices == nil { + if !proxier.endpointsSynced || proxier.allServices == nil { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index d1f1f715079..4a0d1575985 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -383,16 +383,17 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. return &Proxier{ - exec: &exec.FakeExec{}, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), - iptables: ipt, - clusterCIDR: "10.0.0.0/24", - allEndpoints: []*api.Endpoints{}, - allServices: []*api.Service{}, - hostname: testHostname, - portsMap: make(map[localPort]closeable), - portMapper: &fakePortOpener{[]*localPort{}}, - healthChecker: newFakeHealthChecker(), + exec: &exec.FakeExec{}, + serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + iptables: ipt, + clusterCIDR: "10.0.0.0/24", + allEndpoints: make(endpointsMap), + allServices: []*api.Service{}, + endpointsSynced: true, + hostname: testHostname, + portsMap: make(map[localPort]closeable), + portMapper: &fakePortOpener{[]*localPort{}}, + healthChecker: newFakeHealthChecker(), } } @@ -611,7 +612,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { } epIP := "10.180.0.1" - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -623,7 +624,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -675,7 +676,7 @@ func TestLoadBalancer(t *testing.T) { } epIP := "10.180.0.1" - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -687,7 +688,7 @@ func TestLoadBalancer(t *testing.T) { }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -732,7 +733,7 @@ func TestNodePort(t *testing.T) { } epIP := "10.180.0.1" - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -744,7 +745,7 @@ func TestNodePort(t *testing.T) { }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -826,7 +827,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { epIP2 := "10.180.2.1" epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort) epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort) - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -842,7 +843,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -917,7 +918,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable epIP2 := "10.180.2.1" epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort) epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort) - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -933,7 +934,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -1405,6 +1406,15 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap return ept } +func makeEndpointsMap(allEndpoints ...*api.Endpoints) endpointsMap { + result := make(endpointsMap) + for _, endpoints := range allEndpoints { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + result[namespacedName] = endpoints + } + return result +} + func makeNSN(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } @@ -1420,21 +1430,21 @@ func Test_buildNewEndpointsMap(t *testing.T) { var nodeName = "host" testCases := []struct { - newEndpoints []*api.Endpoints + newEndpoints map[types.NamespacedName]*api.Endpoints oldEndpoints map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedStale []endpointServicePair expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing - newEndpoints: []*api.Endpoints{}, + newEndpoints: map[types.NamespacedName]*api.Endpoints{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedStale: []endpointServicePair{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[1]: no change, unnamed port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1445,7 +1455,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", false}, @@ -1460,7 +1470,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[2]: no change, named port, local - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1473,7 +1483,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", true}, @@ -1490,7 +1500,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[3]: no change, multiple subsets - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1510,7 +1520,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1531,7 +1541,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[4]: no change, multiple subsets, multiple ports, local - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1555,7 +1565,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", true}, @@ -1584,7 +1594,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[5]: no change, multiple endpoints, subsets, IPs, and ports - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1633,7 +1643,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1693,7 +1703,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[6]: add an Endpoints - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1705,7 +1715,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { @@ -1718,7 +1728,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[7]: remove an Endpoints - newEndpoints: []*api.Endpoints{ /* empty */ }, + newEndpoints: makeEndpointsMap(), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", true}, @@ -1732,7 +1742,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[8]: add an IP and port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1750,7 +1760,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1772,7 +1782,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[9]: remove an IP and port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1784,7 +1794,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1813,7 +1823,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[10]: add a subset - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1834,7 +1844,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1854,7 +1864,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[11]: remove a subset - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1866,7 +1876,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1887,7 +1897,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[12]: rename a port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1899,7 +1909,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1917,7 +1927,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[13]: renumber a port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1929,7 +1939,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1947,7 +1957,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[14]: complex add and remove - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1995,7 +2005,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index ef07a158ad7..f00fa30162a 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -202,14 +202,12 @@ func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -232,14 +230,12 @@ func TestTCPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -262,14 +258,12 @@ func TestUDPProxy(t *testing.T) { func TestUDPProxyTimeout(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -297,19 +291,20 @@ func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} - lb.OnEndpointsUpdate([]*api.Endpoints{{ + lb.OnEndpointsAdd(&api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}}, }}, - }, { + }) + lb.OnEndpointsAdd(&api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}}, }}, - }}) + }) fexec := makeFakeExec() @@ -397,14 +392,12 @@ func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -444,14 +437,12 @@ func TestTCPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -485,14 +476,12 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -525,14 +514,12 @@ func TestTCPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -572,7 +559,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) fexec := makeFakeExec() @@ -601,7 +588,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ @@ -628,7 +615,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) fexec := makeFakeExec() @@ -657,7 +644,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ @@ -677,14 +664,12 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -728,14 +713,12 @@ func TestTCPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -776,14 +759,12 @@ func TestUDPProxyUpdatePort(t *testing.T) { func TestProxyUpdatePublicIPs(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) fexec := makeFakeExec() @@ -838,7 +819,7 @@ func TestProxyUpdatePortal(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) fexec := makeFakeExec() @@ -890,7 +871,7 @@ func TestProxyUpdatePortal(t *testing.T) { Protocol: "TCP", }}}, }}) - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) svcInfo, exists = p.getServiceInfo(service) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index 5d58c3c25e3..6190f6013b7 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -243,65 +243,92 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn } } -// OnEndpointsUpdate manages the registered service endpoints. -// Registered endpoints are updated if found in the update set or -// unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { - registeredEndpoints := make(map[proxy.ServicePortName]bool) +// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that +// portname. Expode Endpoints.Subsets[*] into this structure. +func buildPortsToEndpointsMap(endpoints *api.Endpoints) map[string][]hostPortPair { + portsToEndpoints := map[string][]hostPortPair{} + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + for i := range ss.Addresses { + addr := &ss.Addresses[i] + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) + // Ignore the protocol field - we'll get that from the Service objects. + } + } + } + return portsToEndpoints +} + +func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + lb.lock.Lock() defer lb.lock.Unlock() - // Update endpoints for services. - for i := range allEndpoints { - // svcEndpoints object should NOT be modified. - svcEndpoints := allEndpoints[i] + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + state, exists := lb.services[svcPort] - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortPair{} - for i := range svcEndpoints.Subsets { - ss := &svcEndpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) - // Ignore the protocol field - we'll get that from the Service objects. - } - } - } + if !exists || state == nil || len(newEndpoints) > 0 { + glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + lb.updateAffinityMap(svcPort, newEndpoints) + // OnEndpointsAdd can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. The affinity will be updated + // later, once NewService is called. + state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) - for portname := range portsToEndpoints { - svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} - state, exists := lb.services[svcPort] - curEndpoints := []string{} - if state != nil { - curEndpoints = state.endpoints - } - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) - - if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { - glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) - lb.updateAffinityMap(svcPort, newEndpoints) - // OnEndpointsUpdate can be called without NewService being called externally. - // To be safe we will call it here. A new service will only be created - // if one does not already exist. The affinity will be updated - // later, once NewService is called. - state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) - state.endpoints = slice.ShuffleStrings(newEndpoints) - - // Reset the round-robin index. - state.index = 0 - } - registeredEndpoints[svcPort] = true + // Reset the round-robin index. + state.index = 0 } } - // Remove endpoints missing from the update. - for k := range lb.services { - if _, exists := registeredEndpoints[k]; !exists { - glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k) +} + +func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints) + registeredEndpoints := make(map[proxy.ServicePortName]bool) + + lb.lock.Lock() + defer lb.lock.Unlock() + + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + state, exists := lb.services[svcPort] + + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints + } + + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { + glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + lb.updateAffinityMap(svcPort, newEndpoints) + // OnEndpointsUpdate can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. The affinity will be updated + // later, once NewService is called. + state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) + + // Reset the round-robin index. + state.index = 0 + } + registeredEndpoints[svcPort] = true + } + + // Now remove all endpoints missing from the update. + for portname := range oldPortsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: oldEndpoints.Namespace, Name: oldEndpoints.Name}, Port: portname} + if _, exists := registeredEndpoints[svcPort]; !exists { + glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) // Reset but don't delete. - state := lb.services[k] + state := lb.services[svcPort] state.endpoints = []string{} state.index = 0 state.affinity.affinityMap = map[string]*affinityState{} @@ -309,6 +336,27 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { } } +func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + + lb.lock.Lock() + defer lb.lock.Unlock() + + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) + // If the service is still around, reset but don't delete. + if state, ok := lb.services[svcPort]; ok { + state.endpoints = []string{} + state.index = 0 + state.affinity.affinityMap = map[string]*affinityState{} + } + } +} + +func (lb *LoadBalancerRR) OnEndpointsSynced() { +} + // Tests whether two slices are equivalent. This sorts both slices in-place. func slicesEquiv(lhs, rhs []string) bool { if len(lhs) != len(rhs) { diff --git a/pkg/proxy/userspace/roundrobin_test.go b/pkg/proxy/userspace/roundrobin_test.go index 732179d8080..de3a64de77b 100644 --- a/pkg/proxy/userspace/roundrobin_test.go +++ b/pkg/proxy/userspace/roundrobin_test.go @@ -67,8 +67,6 @@ func TestFilterWorks(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - var endpoints []*api.Endpoints - loadBalancer.OnEndpointsUpdate(endpoints) service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil { @@ -106,15 +104,14 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Name: "p", Port: 40}}, }}, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) @@ -144,15 +141,14 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint"}}, Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}}, }}, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) shuffledEndpoints := loadBalancer.services[service].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { @@ -172,8 +168,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -186,7 +181,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) shuffledEndpoints := loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") { @@ -215,8 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -233,7 +227,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") { @@ -255,7 +249,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -268,7 +262,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") { @@ -289,8 +283,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) // Clear endpoints - endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} - loadBalancer.OnEndpointsUpdate(endpoints) + endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false) if err == nil || len(endpoint) != 0 { @@ -306,8 +300,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 2) - endpoints[0] = &api.Endpoints{ + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -316,7 +309,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - endpoints[1] = &api.Endpoints{ + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -325,7 +318,8 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints1) + loadBalancer.OnEndpointsAdd(endpoints2) shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil) @@ -341,7 +335,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil) // Then update the configuration by removing foo - loadBalancer.OnEndpointsUpdate(endpoints[1:]) + loadBalancer.OnEndpointsDelete(endpoints1) endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -364,8 +358,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -373,7 +366,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} @@ -420,15 +413,14 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { } // Call OnEndpointsUpdate() before NewService() - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} @@ -482,8 +474,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -492,7 +483,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] @@ -503,7 +494,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -512,7 +503,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[service].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] @@ -525,7 +516,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) - endpoints[0] = &api.Endpoints{ + endpointsv3 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -534,7 +525,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) shuffledEndpoints = loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, client1Endpoint, client1) expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) @@ -556,8 +547,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -566,7 +556,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) @@ -577,7 +567,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -586,7 +576,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) @@ -596,8 +586,8 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Clear endpoints - endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} - loadBalancer.OnEndpointsUpdate(endpoints) + endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) endpoint, err = loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { @@ -616,8 +606,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 2) - endpoints[0] = &api.Endpoints{ + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -628,7 +617,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { } barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""} loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) - endpoints[1] = &api.Endpoints{ + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -637,7 +626,8 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints1) + loadBalancer.OnEndpointsAdd(endpoints2) shuffledFooEndpoints := loadBalancer.services[fooService].endpoints expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) @@ -659,7 +649,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) // Then update the configuration by removing foo - loadBalancer.OnEndpointsUpdate(endpoints[1:]) + loadBalancer.OnEndpointsDelete(endpoints1) endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -685,8 +675,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -694,7 +683,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}