diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index cd5f0b5eaec..ac6a28fdafb 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -196,6 +196,7 @@ type Proxier struct { initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncPeriod time.Duration + lastIPTablesCleanup time.Time // These are effectively const and do not need the mutex to be held. iptables utiliptables.Interface @@ -890,17 +891,6 @@ func (proxier *Proxier) syncProxyRules() { // Below this point we will not return until we try to write the iptables rules. // - // Get iptables-save output so we can check for existing chains and rules. - // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore - existingNATChains := make(map[utiliptables.Chain]struct{}) - proxier.iptablesData.Reset() - err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) - if err != nil { - klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted") - } else { - existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes()) - } - // Reset all buffers used later. // This is to avoid memory reallocations and thus improve performance. proxier.filterChains.Reset() @@ -1339,19 +1329,34 @@ func (proxier *Proxier) syncProxyRules() { } } - // Delete chains no longer in use. - for chain := range existingNATChains { - if !activeNATChains[chain] { - chainString := string(chain) - if !isServiceChainName(chainString) { - // Ignore chains that aren't ours. - continue + // Delete chains no longer in use. Since "iptables-save" can take several seconds + // to run on hosts with lots of iptables rules, we don't bother to do this on + // every sync in large clusters. (Stale chains will not be referenced by any + // active rules, so they're harmless other than taking up memory.) + if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod { + var existingNATChains map[utiliptables.Chain]struct{} + + proxier.iptablesData.Reset() + if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil { + existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes()) + + for chain := range existingNATChains { + if !activeNATChains[chain] { + chainString := string(chain) + if !isServiceChainName(chainString) { + // Ignore chains that aren't ours. + continue + } + // We must (as per iptables) write a chain-line + // for it, which has the nice effect of flushing + // the chain. Then we can remove the chain. + proxier.natChains.Write(utiliptables.MakeChainLine(chain)) + proxier.natRules.Write("-X", chainString) + } } - // We must (as per iptables) write a chain-line for it, which has - // the nice effect of flushing the chain. Then we can remove the - // chain. - proxier.natChains.Write(utiliptables.MakeChainLine(chain)) - proxier.natRules.Write("-X", chainString) + proxier.lastIPTablesCleanup = time.Now() + } else { + klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted") } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 38e8d24910b..6a885aabd54 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -7438,6 +7438,7 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) fp.masqueradeAll = true + fp.syncPeriod = 30 * time.Second makeServiceMap(fp, makeTestService("ns1", "svc1", func(svc *v1.Service) { @@ -7535,6 +7536,65 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { if numComments != 0 { t.Errorf("numComments (%d) != 0 when numEndpoints (%d) > threshold (%d)", numComments, numEndpoints, largeClusterEndpointsThreshold) } + + // Now test service deletion; we have to create another service to do this though, + // because if we deleted any of the existing services, we'd fall back out of large + // cluster mode. + svc4 := makeTestService("ns4", "svc4", func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.30.0.44" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p8082", + Port: 8082, + Protocol: v1.ProtocolTCP, + }} + }) + fp.OnServiceAdd(svc4) + fp.OnEndpointSliceAdd(makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"10.4.0.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: utilpointer.StringPtr("p8082"), + Port: utilpointer.Int32(8082), + Protocol: &tcpProtocol, + }} + })) + fp.syncProxyRules() + expectedEndpoints += 1 + + svc4Endpoint, numEndpoints, _ := countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") + assert.Equal(t, "-A KUBE-SEP-SU5STNODRYEWJAUF -m tcp -p tcp -j DNAT --to-destination 10.4.0.1:8082", svc4Endpoint, "svc4 endpoint was not created") + if numEndpoints != expectedEndpoints { + t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", expectedEndpoints, numEndpoints) + } + + // In large-cluster mode, if we delete a service, it will not re-sync its chains + // but it will not delete them immediately either. + fp.lastIPTablesCleanup = time.Now() + fp.OnServiceDelete(svc4) + fp.syncProxyRules() + expectedEndpoints -= 1 + + svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") + assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!") + if numEndpoints != expectedEndpoints { + t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", expectedEndpoints, numEndpoints) + } + assert.NotContains(t, fp.iptablesData.String(), "-X ", "iptables data unexpectedly contains chain deletions") + + // But resyncing after a long-enough delay will delete the stale chains + fp.lastIPTablesCleanup = time.Now().Add(-fp.syncPeriod).Add(-1) + fp.syncProxyRules() + + svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") + assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!") + if numEndpoints != expectedEndpoints { + t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", expectedEndpoints, numEndpoints) + } + assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SVC-EBDQOQU5SJFXRIL3", "iptables data does not contain chain deletion") + assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SEP-SU5STNODRYEWJAUF", "iptables data does not contain endpoint deletions") } // Test calling syncProxyRules() multiple times with various changes