From eb6949a53e6d96590e3cb2ea5abc4d0988953fff Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 21 Apr 2017 15:16:16 +0200 Subject: [PATCH] Change locking mechanism in kube-proxy --- pkg/proxy/iptables/proxier.go | 174 +++++++++++++---------------- pkg/proxy/iptables/proxier_test.go | 4 +- 2 files changed, 82 insertions(+), 96 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 202901a24cb..26f03889ec5 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -199,16 +199,62 @@ type endpointsChange struct { current *api.Endpoints } +type endpointsChangeMap struct { + sync.Mutex + items map[types.NamespacedName]*endpointsChange +} + type serviceChange struct { previous *api.Service current *api.Service } -type endpointsChangeMap map[types.NamespacedName]*endpointsChange -type serviceChangeMap map[types.NamespacedName]*serviceChange +type serviceChangeMap struct { + sync.Mutex + items map[types.NamespacedName]*serviceChange +} + type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo +func newEndpointsChangeMap() endpointsChangeMap { + return endpointsChangeMap{ + items: make(map[types.NamespacedName]*endpointsChange), + } +} + +func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) { + ecm.Lock() + defer ecm.Unlock() + + change, exists := ecm.items[*namespacedName] + if !exists { + change = &endpointsChange{} + change.previous = previous + ecm.items[*namespacedName] = change + } + change.current = current +} + +func newServiceChangeMap() serviceChangeMap { + return serviceChangeMap{ + items: make(map[types.NamespacedName]*serviceChange), + } +} + +func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) { + scm.Lock() + defer scm.Unlock() + + change, exists := scm.items[*namespacedName] + if !exists { + change = &serviceChange{} + change.previous = previous + scm.items[*namespacedName] = change + } + change.current = current +} + func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { for svcPort := range other { em[svcPort] = other[svcPort] @@ -224,24 +270,17 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { - mu sync.Mutex // protects the following fields - - serviceMap proxyServiceMap - // serviceChanges contains all changes to services that happened since - // last syncProxyRules call. For a single object, changes are accumulated, - // i.e. previous is state from before all of them, current is state after - // applying all of those. - serviceChanges serviceChangeMap - - endpointsMap proxyEndpointsMap - // endpointsChanges contains all changes to endpoints that happened since - // last syncProxyRules call. For a single object, changes are accumulated, - // i.e. previous is state from before all of them, current is state after - // applying all of those. + // endpointsChanges and serviceChanges contains all changes to endpoints and + // services that happened since last syncProxyRules call. For a single object, + // changes are accumulated, i.e. previous is state from before all of them, + // current is state after applying all of those. endpointsChanges endpointsChangeMap + serviceChanges serviceChangeMap - portsMap map[localPort]closeable - + mu sync.Mutex // protects the following fields + serviceMap proxyServiceMap + endpointsMap proxyEndpointsMap + portsMap map[localPort]closeable // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating iptables // with some partial data after kube-proxy restart. @@ -360,9 +399,9 @@ func NewProxier(ipt utiliptables.Interface, return &Proxier{ portsMap: make(map[localPort]closeable), serviceMap: make(proxyServiceMap), - serviceChanges: make(serviceChangeMap), + serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), - endpointsChanges: make(endpointsChangeMap), + endpointsChanges: newEndpointsChangeMap(), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, throttle: throttle, @@ -468,8 +507,6 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { - proxier.mu.Lock() - defer proxier.mu.Unlock() proxier.syncProxyRules(syncReasonForce) } @@ -486,59 +523,30 @@ func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - - proxier.mu.Lock() - defer proxier.mu.Unlock() - - change, exists := proxier.serviceChanges[namespacedName] - if !exists { - change = &serviceChange{} - change.previous = nil - proxier.serviceChanges[namespacedName] = change - } - change.current = service + proxier.serviceChanges.update(&namespacedName, nil, service) proxier.syncProxyRules(syncReasonServices) } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - - proxier.mu.Lock() - defer proxier.mu.Unlock() - - change, exists := proxier.serviceChanges[namespacedName] - if !exists { - change = &serviceChange{} - change.previous = oldService - proxier.serviceChanges[namespacedName] = change - } - change.current = service + proxier.serviceChanges.update(&namespacedName, oldService, service) proxier.syncProxyRules(syncReasonServices) } func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - - proxier.mu.Lock() - defer proxier.mu.Unlock() - - change, exists := proxier.serviceChanges[namespacedName] - if !exists { - change = &serviceChange{} - change.previous = service - proxier.serviceChanges[namespacedName] = change - } - change.current = nil + proxier.serviceChanges.update(&namespacedName, service, nil) proxier.syncProxyRules(syncReasonServices) } func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() - defer proxier.mu.Unlock() proxier.servicesSynced = true + proxier.mu.Unlock() + proxier.syncProxyRules(syncReasonServices) } @@ -625,12 +633,12 @@ func updateServiceMap( syncRequired = false staleServices = sets.NewString() - for _, change := range *changes { + for _, change := range changes.items { mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current) unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices) syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired } - *changes = make(serviceChangeMap) + changes.items = make(map[types.NamespacedName]*serviceChange) // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to serviceMap. @@ -652,59 +660,30 @@ func updateServiceMap( func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - - proxier.mu.Lock() - defer proxier.mu.Unlock() - - change, exists := proxier.endpointsChanges[namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = nil - proxier.endpointsChanges[namespacedName] = change - } - change.current = endpoints + proxier.endpointsChanges.update(&namespacedName, nil, endpoints) proxier.syncProxyRules(syncReasonEndpoints) } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - - proxier.mu.Lock() - defer proxier.mu.Unlock() - - change, exists := proxier.endpointsChanges[namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = oldEndpoints - proxier.endpointsChanges[namespacedName] = change - } - change.current = endpoints + proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) proxier.syncProxyRules(syncReasonEndpoints) } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - - proxier.mu.Lock() - defer proxier.mu.Unlock() - - change, exists := proxier.endpointsChanges[namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = endpoints - proxier.endpointsChanges[namespacedName] = change - } - change.current = nil + proxier.endpointsChanges.update(&namespacedName, endpoints, nil) proxier.syncProxyRules(syncReasonEndpoints) } func (proxier *Proxier) OnEndpointsSynced() { proxier.mu.Lock() - defer proxier.mu.Unlock() proxier.endpointsSynced = true + proxier.mu.Unlock() + proxier.syncProxyRules(syncReasonEndpoints) } @@ -716,7 +695,7 @@ func updateEndpointsMap( hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { syncRequired = false staleSet = make(map[endpointServicePair]bool) - for _, change := range *changes { + for _, change := range changes.items { oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname) newEndpointsMap := endpointsToEndpointsMap(change.current, hostname) if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) { @@ -726,7 +705,7 @@ func updateEndpointsMap( syncRequired = true } } - *changes = make(endpointsChangeMap) + changes.items = make(map[types.NamespacedName]*endpointsChange) if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { return @@ -904,6 +883,9 @@ const syncReasonForce syncReason = "Force" // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules(reason syncReason) { + proxier.mu.Lock() + defer proxier.mu.Unlock() + if proxier.throttle != nil { proxier.throttle.Accept() } @@ -918,8 +900,10 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // Figure out the new services we need to activate. + proxier.serviceChanges.Lock() serviceSyncRequired, hcServices, staleServices := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) + proxier.serviceChanges.Unlock() // If this was called because of a services update, but nothing actionable has changed, skip it. if reason == syncReasonServices && !serviceSyncRequired { @@ -927,8 +911,10 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { return } + proxier.endpointsChanges.Lock() endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap( proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) + proxier.endpointsChanges.Unlock() // If this was called because of an endpoints update, but nothing actionable has changed, skip it. if reason == syncReasonEndpoints && !endpointsSyncRequired { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 1fed2161f92..4f493d82a14 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -386,9 +386,9 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { return &Proxier{ exec: &exec.FakeExec{}, serviceMap: make(proxyServiceMap), - serviceChanges: make(serviceChangeMap), + serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), - endpointsChanges: make(endpointsChangeMap), + endpointsChanges: newEndpointsChangeMap(), iptables: ipt, clusterCIDR: "10.0.0.0/24", hostname: testHostname,