From 758c9666e52c54ecf246ecc7613e096c35abc624 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 19 May 2017 08:44:44 +0200 Subject: [PATCH] Call syncProxyRules when really needed and remove reasons --- pkg/proxy/iptables/proxier.go | 66 +++++++++++------------------- pkg/proxy/iptables/proxier_test.go | 46 ++++++++------------- 2 files changed, 40 insertions(+), 72 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index ca9c2cd183e..d37eaf63297 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -577,7 +577,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { - proxier.syncProxyRules(syncReasonForce) + proxier.syncProxyRules() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. @@ -604,7 +604,7 @@ func (proxier *Proxier) OnServiceAdd(service *api.Service) { // state right after kube-proxy restart). This can eat a token for calling // syncProxyRules, but is not that critical since it can happen only // after kube-proxy was (re)started. - proxier.syncProxyRules(syncReasonServices) + proxier.syncProxyRules() } } @@ -617,7 +617,7 @@ func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { // state right after kube-proxy restart). This can eat a token for calling // syncProxyRules, but is not that critical since it can happen only // after kube-proxy was (re)started. - proxier.syncProxyRules(syncReasonServices) + proxier.syncProxyRules() } } @@ -630,7 +630,7 @@ func (proxier *Proxier) OnServiceDelete(service *api.Service) { // state right after kube-proxy restart). This can eat a token for calling // syncProxyRules, but is not that critical since it can happen only // after kube-proxy was (re)started. - proxier.syncProxyRules(syncReasonServices) + proxier.syncProxyRules() } } @@ -639,7 +639,7 @@ func (proxier *Proxier) OnServiceSynced() { proxier.servicesSynced = true proxier.mu.Unlock() // Call it unconditionally - this is called once per lifetime. - proxier.syncProxyRules(syncReasonServices) + proxier.syncProxyRules() } func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool { @@ -660,8 +660,7 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool // map is cleared after applying them. func updateServiceMap( serviceMap proxyServiceMap, - changes *serviceChangeMap) (syncRequired bool, hcServices map[types.NamespacedName]uint16, staleServices sets.String) { - syncRequired = false + changes *serviceChangeMap) (hcServices map[types.NamespacedName]uint16, staleServices sets.String) { staleServices = sets.NewString() func() { @@ -670,7 +669,6 @@ func updateServiceMap( for _, change := range changes.items { existingPorts := serviceMap.merge(change.current) serviceMap.unmerge(change.previous, existingPorts, staleServices) - syncRequired = true } changes.items = make(map[types.NamespacedName]*serviceChange) }() @@ -684,7 +682,7 @@ func updateServiceMap( } } - return syncRequired, hcServices, staleServices + return hcServices, staleServices } func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { @@ -696,7 +694,7 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { // state right after kube-proxy restart). This can eat a token for calling // syncProxyRules, but is not that critical since it can happen only // after kube-proxy was (re)started. - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } } @@ -709,7 +707,7 @@ func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints // state right after kube-proxy restart). This can eat a token for calling // syncProxyRules, but is not that critical since it can happen only // after kube-proxy was (re)started. - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } } @@ -722,7 +720,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { // state right after kube-proxy restart). This can eat a token for calling // syncProxyRules, but is not that critical since it can happen only // after kube-proxy was (re)started. - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } } @@ -731,7 +729,7 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.endpointsSynced = true proxier.mu.Unlock() // Call it unconditionally - this is called once per lifetime. - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } // is updated by this function (based on the given changes). @@ -739,8 +737,7 @@ func (proxier *Proxier) OnEndpointsSynced() { func updateEndpointsMap( endpointsMap proxyEndpointsMap, changes *endpointsChangeMap, - hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { - syncRequired = false + hostname string) (hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { staleSet = make(map[endpointServicePair]bool) func() { @@ -750,7 +747,6 @@ func updateEndpointsMap( endpointsMap.unmerge(change.previous) endpointsMap.merge(change.current) detectStaleConnections(change.previous, change.current, staleSet) - syncRequired = true } changes.items = make(map[types.NamespacedName]*endpointsChange) }() @@ -767,7 +763,7 @@ func updateEndpointsMap( hcEndpoints[nsn] = len(ips) } - return syncRequired, hcEndpoints, staleSet + return hcEndpoints, staleSet } // are modified by this function with detected stale @@ -942,16 +938,10 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ } } -type syncReason string - -const syncReasonServices syncReason = "ServicesUpdate" -const syncReasonEndpoints syncReason = "EndpointsUpdate" -const syncReasonForce syncReason = "Force" - // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held -func (proxier *Proxier) syncProxyRules(reason syncReason) { +func (proxier *Proxier) syncProxyRules() { proxier.mu.Lock() defer proxier.mu.Unlock() @@ -961,7 +951,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { start := time.Now() defer func() { SyncProxyRulesLatency.Observe(sinceInMicroseconds(start)) - glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start)) + glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() // don't sync rules till we've received services and endpoints if !proxier.endpointsSynced || !proxier.servicesSynced { @@ -969,25 +959,15 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { return } - // Figure out the new services we need to activate. - serviceSyncRequired, hcServices, staleServices := updateServiceMap( + // We assume that if syncProxyRules was called, we really want to sync them, + // even if nothing changed in the meantime. In other words, caller are + // responsible for detecting no-op changes and not calling syncProxyRules in + // such cases. + hcServices, staleServices := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) - - // If this was called because of a services update, but nothing actionable has changed, skip it. - if reason == syncReasonServices && !serviceSyncRequired { - glog.V(3).Infof("Skipping iptables sync because nothing changed") - return - } - - endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap( + hcEndpoints, staleEndpoints := updateEndpointsMap( proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) - // If this was called because of an endpoints update, but nothing actionable has changed, skip it. - if reason == syncReasonEndpoints && !endpointsSyncRequired { - glog.V(3).Infof("Skipping iptables sync because nothing changed") - return - } - glog.V(3).Infof("Syncing iptables rules") // Create and link the kube services chain. @@ -1569,8 +1549,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } proxier.portsMap = replacementPortsMap - // Update healthz timestamp if it is periodic sync. - if proxier.healthzServer != nil && reason == syncReasonForce { + // Update healthz timestamp. + if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 0d11b7cc40e..a665c3c57fe 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -584,7 +584,7 @@ func TestClusterIPReject(t *testing.T) { }), ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) svcRules := ipt.GetRules(svcChain) @@ -633,7 +633,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() epStr := fmt.Sprintf("%s:%d", epIP, svcPort) svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) @@ -697,7 +697,7 @@ func TestLoadBalancer(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svcPortName.String(), proto)) @@ -754,7 +754,7 @@ func TestNodePort(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() proto := strings.ToLower(string(api.ProtocolTCP)) svcChain := string(servicePortChainName(svcPortName.String(), proto)) @@ -791,7 +791,7 @@ func TestExternalIPsReject(t *testing.T) { ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) { @@ -824,7 +824,7 @@ func TestNodePortReject(t *testing.T) { ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) { @@ -887,7 +887,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svcPortName.String(), proto)) @@ -978,7 +978,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() proto := strings.ToLower(string(api.ProtocolTCP)) lbChain := string(serviceLBChainName(svcPortName.String(), proto)) @@ -1083,7 +1083,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.OnServiceAdd(services[i]) } - _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 8 { t.Errorf("expected service map length 8, got %v", fp.serviceMap) } @@ -1116,7 +1116,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) - _, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.serviceMap) } @@ -1152,7 +1152,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { ) // Headless service should be ignored - _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) } @@ -1180,7 +1180,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), ) - _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.serviceMap) } @@ -1222,10 +1222,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.OnServiceAdd(servicev1) - syncRequired, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) - if !syncRequired { - t.Errorf("expected sync required, got %t", syncRequired) - } + hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } @@ -1239,10 +1236,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) - syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) - if !syncRequired { - t.Errorf("expected sync required, got %t", syncRequired) - } + hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } @@ -1256,10 +1250,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) - syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) - if syncRequired { - t.Errorf("not expected sync required, got %t", syncRequired) - } + hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } @@ -1272,10 +1263,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) - syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) - if !syncRequired { - t.Errorf("expected sync required, got %t", syncRequired) - } + hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } @@ -2397,7 +2385,7 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsUpdate(prev, curr) } } - _, hcEndpoints, stale := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + hcEndpoints, stale := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) if len(stale) != len(tc.expectedStale) {