diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1e44f472db6..2334f9863e3 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -199,7 +199,7 @@ type endpointsChange struct { } type endpointsChangeMap struct { - sync.Mutex + lock sync.Mutex items map[types.NamespacedName]*endpointsChange } @@ -209,7 +209,7 @@ type serviceChange struct { } type serviceChangeMap struct { - sync.Mutex + lock sync.Mutex items map[types.NamespacedName]*serviceChange } @@ -223,8 +223,8 @@ func newEndpointsChangeMap() endpointsChangeMap { } func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) { - ecm.Lock() - defer ecm.Unlock() + ecm.lock.Lock() + defer ecm.lock.Unlock() change, exists := ecm.items[*namespacedName] if !exists { @@ -242,8 +242,8 @@ func newServiceChangeMap() serviceChangeMap { } func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) { - scm.Lock() - defer scm.Unlock() + scm.lock.Lock() + defer scm.lock.Unlock() change, exists := scm.items[*namespacedName] if !exists { @@ -509,7 +509,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { - proxier.syncProxyRules(syncReasonForce) + proxier.syncProxyRules() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. @@ -531,21 +531,21 @@ func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} proxier.serviceChanges.update(&namespacedName, nil, service) - proxier.syncProxyRules(syncReasonServices) + proxier.syncProxyRules() } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} proxier.serviceChanges.update(&namespacedName, oldService, service) - proxier.syncProxyRules(syncReasonServices) + proxier.syncProxyRules() } func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} proxier.serviceChanges.update(&namespacedName, service, nil) - proxier.syncProxyRules(syncReasonServices) + proxier.syncProxyRules() } func (proxier *Proxier) OnServiceSynced() { @@ -553,7 +553,7 @@ func (proxier *Proxier) OnServiceSynced() { proxier.servicesSynced = true proxier.mu.Unlock() - proxier.syncProxyRules(syncReasonServices) + proxier.syncProxyRules() } func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool { @@ -587,7 +587,7 @@ func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String info := newServiceInfo(serviceName, servicePort, service) oldInfo, exists := (*sm)[serviceName] equal := reflect.DeepEqual(info, oldInfo) - if exists { + if !exists { glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) } else if !equal { glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) @@ -662,21 +662,21 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} proxier.endpointsChanges.update(&namespacedName, nil, endpoints) - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} proxier.endpointsChanges.update(&namespacedName, endpoints, nil) - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } func (proxier *Proxier) OnEndpointsSynced() { @@ -684,7 +684,7 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.endpointsSynced = true proxier.mu.Unlock() - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } // is updated by this function (based on the given changes). @@ -873,16 +873,10 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ } } -type syncReason string - -const syncReasonServices syncReason = "ServicesUpdate" -const syncReasonEndpoints syncReason = "EndpointsUpdate" -const syncReasonForce syncReason = "Force" - // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held -func (proxier *Proxier) syncProxyRules(reason syncReason) { +func (proxier *Proxier) syncProxyRules() { proxier.mu.Lock() defer proxier.mu.Unlock() @@ -891,7 +885,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } start := time.Now() defer func() { - glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start)) + glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() // don't sync rules till we've received services and endpoints if !proxier.endpointsSynced || !proxier.servicesSynced { @@ -900,28 +894,20 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // Figure out the new services we need to activate. - proxier.serviceChanges.Lock() + proxier.serviceChanges.lock.Lock() serviceSyncRequired, hcServices, staleServices := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) - proxier.serviceChanges.Unlock() + proxier.serviceChanges.lock.Unlock() - // If this was called because of a services update, but nothing actionable has changed, skip it. - if reason == syncReasonServices && !serviceSyncRequired { - glog.V(3).Infof("Skipping iptables sync because nothing changed") - return - } - - proxier.endpointsChanges.Lock() + proxier.endpointsChanges.lock.Lock() endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap( proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) - proxier.endpointsChanges.Unlock() + proxier.endpointsChanges.lock.Unlock() - // If this was called because of an endpoints update, but nothing actionable has changed, skip it. - if reason == syncReasonEndpoints && !endpointsSyncRequired { + if !serviceSyncRequired && !endpointsSyncRequired { glog.V(3).Infof("Skipping iptables sync because nothing changed") return } - glog.V(3).Infof("Syncing iptables rules") // Create and link the kube services chain. @@ -1495,8 +1481,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } proxier.portsMap = replacementPortsMap - // Update healthz timestamp if it is periodic sync. - if proxier.healthzServer != nil && reason == syncReasonForce { + // Update healthz timestamp. + if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 4f493d82a14..fb6b29165dd 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -579,7 +579,7 @@ func TestClusterIPReject(t *testing.T) { }), ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) svcRules := ipt.GetRules(svcChain) @@ -628,7 +628,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() epStr := fmt.Sprintf("%s:%d", epIP, svcPort) svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) @@ -692,7 +692,7 @@ func TestLoadBalancer(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svcPortName.String(), proto)) @@ -749,7 +749,7 @@ func TestNodePort(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() proto := strings.ToLower(string(api.ProtocolTCP)) svcChain := string(servicePortChainName(svcPortName.String(), proto)) @@ -786,7 +786,7 @@ func TestExternalIPsReject(t *testing.T) { ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) { @@ -819,7 +819,7 @@ func TestNodePortReject(t *testing.T) { ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) { @@ -882,7 +882,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svcPortName.String(), proto)) @@ -973,7 +973,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() proto := strings.ToLower(string(api.ProtocolTCP)) lbChain := string(serviceLBChainName(svcPortName.String(), proto))