diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1cbc78db5bc..ca9c2cd183e 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -194,18 +194,19 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se } type endpointsChange struct { - previous *api.Endpoints - current *api.Endpoints + previous proxyEndpointsMap + current proxyEndpointsMap } type endpointsChangeMap struct { sync.Mutex - items map[types.NamespacedName]*endpointsChange + hostname string + items map[types.NamespacedName]*endpointsChange } type serviceChange struct { - previous *api.Service - current *api.Service + previous proxyServiceMap + current proxyServiceMap } type serviceChangeMap struct { @@ -216,23 +217,34 @@ type serviceChangeMap struct { type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo -func newEndpointsChangeMap() endpointsChangeMap { +func newEndpointsChangeMap(hostname string) endpointsChangeMap { return endpointsChangeMap{ - items: make(map[types.NamespacedName]*endpointsChange), + hostname: hostname, + items: make(map[types.NamespacedName]*endpointsChange), } } -func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) { +func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool { ecm.Lock() defer ecm.Unlock() change, exists := ecm.items[*namespacedName] if !exists { change = &endpointsChange{} - change.previous = previous + change.previous = endpointsToEndpointsMap(previous, ecm.hostname) ecm.items[*namespacedName] = change } - change.current = current + change.current = endpointsToEndpointsMap(current, ecm.hostname) + if reflect.DeepEqual(change.previous, change.current) { + delete(ecm.items, *namespacedName) + return false + } + // TODO: Instead of returning true/false, we should consider returning whether + // the map contains some element or not. Currently, if the change is + // "reverting" some previous endpoints update, but there are still some other + // modified endpoints, we will return false, even though there are some change + // to apply. + return true } func newServiceChangeMap() serviceChangeMap { @@ -241,17 +253,60 @@ func newServiceChangeMap() serviceChangeMap { } } -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) { +func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool { scm.Lock() defer scm.Unlock() change, exists := scm.items[*namespacedName] if !exists { change = &serviceChange{} - change.previous = previous + change.previous = serviceToServiceMap(previous) scm.items[*namespacedName] = change } - change.current = current + change.current = serviceToServiceMap(current) + if reflect.DeepEqual(change.previous, change.current) { + delete(scm.items, *namespacedName) + return false + } + // TODO: Instead of returning true/false, we should consider returning whether + // the map contains some element or not. Currently, if the change is + // "reverting" some previous endpoints update, but there are still some other + // modified endpoints, we will return false, even though there are some change + // to apply. + return true +} + +func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { + existingPorts := sets.NewString() + for serviceName, info := range other { + existingPorts.Insert(serviceName.Port) + _, exists := (*sm)[serviceName] + if !exists { + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol) + } else { + glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol) + } + (*sm)[serviceName] = info + } + return existingPorts +} + +func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { + for serviceName := range other { + if existingPorts.Has(serviceName.Port) { + continue + } + info, exists := (*sm)[serviceName] + if exists { + glog.V(1).Infof("Removing service %q", serviceName) + if info.protocol == api.ProtocolUDP { + staleServices.Insert(info.clusterIP.String()) + } + delete(*sm, serviceName) + } else { + glog.Errorf("Service %q removed, but doesn't exists", serviceName) + } + } } func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { @@ -410,7 +465,7 @@ func NewProxier(ipt utiliptables.Interface, serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(), + endpointsChanges: newEndpointsChangeMap(hostname), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, throttle: throttle, @@ -542,30 +597,48 @@ func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, nil, service) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, nil, service) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonServices) + } } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, oldService, service) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, oldService, service) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonServices) + } } func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, service, nil) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, service, nil) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonServices) + } } func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true proxier.mu.Unlock() - + // Call it unconditionally - this is called once per lifetime. proxier.syncProxyRules(syncReasonServices) } @@ -583,67 +656,6 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool return false } -func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) { - if service == nil { - return false, nil - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if shouldSkipService(svcName, service) { - return false, nil - } - syncRequired := false - existingPorts := sets.NewString() - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - existingPorts.Insert(servicePort.Name) - info := newServiceInfo(serviceName, servicePort, service) - oldInfo, exists := (*sm)[serviceName] - equal := reflect.DeepEqual(info, oldInfo) - if exists { - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) - } else if !equal { - glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) - } - if !equal { - (*sm)[serviceName] = info - syncRequired = true - } - } - return syncRequired, existingPorts -} - -// are modified by this function with detected stale services. -func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool { - if service == nil { - return false - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if shouldSkipService(svcName, service) { - return false - } - syncRequired := false - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - if existingPorts.Has(servicePort.Name) { - continue - } - serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - info, exists := (*sm)[serviceName] - if exists { - glog.V(1).Infof("Removing service %q", serviceName) - if info.protocol == api.ProtocolUDP { - staleServices.Insert(info.clusterIP.String()) - } - delete(*sm, serviceName) - syncRequired = true - } else { - glog.Errorf("Service %q removed, but doesn't exists", serviceName) - } - } - return syncRequired -} - // is updated by this function (based on the given changes). // map is cleared after applying them. func updateServiceMap( @@ -652,12 +664,16 @@ func updateServiceMap( syncRequired = false staleServices = sets.NewString() - for _, change := range changes.items { - mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current) - unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices) - syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired - } - changes.items = make(map[types.NamespacedName]*serviceChange) + func() { + changes.Lock() + defer changes.Unlock() + for _, change := range changes.items { + existingPorts := serviceMap.merge(change.current) + serviceMap.unmerge(change.previous, existingPorts, staleServices) + syncRequired = true + } + changes.items = make(map[types.NamespacedName]*serviceChange) + }() // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to serviceMap. @@ -673,30 +689,48 @@ func updateServiceMap( func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.update(&namespacedName, nil, endpoints) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonEndpoints) + } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonEndpoints) + } } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.update(&namespacedName, endpoints, nil) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonEndpoints) + } } func (proxier *Proxier) OnEndpointsSynced() { proxier.mu.Lock() proxier.endpointsSynced = true proxier.mu.Unlock() - + // Call it unconditionally - this is called once per lifetime. proxier.syncProxyRules(syncReasonEndpoints) } @@ -708,17 +742,18 @@ func updateEndpointsMap( hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { syncRequired = false staleSet = make(map[endpointServicePair]bool) - for _, change := range changes.items { - oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname) - newEndpointsMap := endpointsToEndpointsMap(change.current, hostname) - if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) { - endpointsMap.unmerge(oldEndpointsMap) - endpointsMap.merge(newEndpointsMap) - detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet) + + func() { + changes.Lock() + defer changes.Unlock() + for _, change := range changes.items { + endpointsMap.unmerge(change.previous) + endpointsMap.merge(change.current) + detectStaleConnections(change.previous, change.current, staleSet) syncRequired = true } - } - changes.items = make(map[types.NamespacedName]*endpointsChange) + changes.items = make(map[types.NamespacedName]*endpointsChange) + }() if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { return @@ -820,6 +855,27 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd return endpointsMap } +// Translates single Service object to proxyServiceMap. +// +// NOTE: service object should NOT be modified. +func serviceToServiceMap(service *api.Service) proxyServiceMap { + if service == nil { + return nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if shouldSkipService(svcName, service) { + return nil + } + + serviceMap := make(proxyServiceMap) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + serviceMap[serviceName] = newServiceInfo(serviceName, servicePort, service) + } + return serviceMap +} + // portProtoHash takes the ServicePortName and protocol for a service // returns the associated 16 character hash. This is computed by hashing (sha256) // then encoding to base32 and truncating to 16 chars. We do this because IPTables @@ -914,10 +970,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // Figure out the new services we need to activate. - proxier.serviceChanges.Lock() serviceSyncRequired, hcServices, staleServices := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) - proxier.serviceChanges.Unlock() // If this was called because of a services update, but nothing actionable has changed, skip it. if reason == syncReasonServices && !serviceSyncRequired { @@ -925,10 +979,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { return } - proxier.endpointsChanges.Lock() endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap( proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) - proxier.endpointsChanges.Unlock() // If this was called because of an endpoints update, but nothing actionable has changed, skip it. if reason == syncReasonEndpoints && !endpointsSyncRequired { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index aba2e3c8271..0d11b7cc40e 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -388,7 +388,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(), + endpointsChanges: newEndpointsChangeMap(testHostname), iptables: ipt, clusterCIDR: "10.0.0.0/24", hostname: testHostname, @@ -1611,7 +1611,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.Serv } func Test_updateEndpointsMap(t *testing.T) { - var nodeName = "host" + var nodeName = testHostname unnamedPort := func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{