diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index c8603c0871c..ae241219e0c 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -83,11 +83,6 @@ const ( // the anti-martian-packet rule. It should not be used for any other // rules. kubeletFirewallChain utiliptables.Chain = "KUBE-FIREWALL" - - // largeClusterEndpointsThreshold is the number of endpoints at which - // we switch into "large cluster mode" and optimize for iptables - // performance over iptables debuggability - largeClusterEndpointsThreshold = 1000 ) const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet" @@ -162,7 +157,6 @@ type Proxier struct { initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncPeriod time.Duration - lastIPTablesCleanup time.Time // These are effectively const and do not need the mutex to be held. iptables utiliptables.Interface @@ -191,11 +185,6 @@ type Proxier struct { natChains proxyutil.LineBuffer natRules proxyutil.LineBuffer - // largeClusterMode is set at the beginning of syncProxyRules if we are - // going to end up outputting "lots" of iptables rules and so we need to - // optimize for performance over debuggability. - largeClusterMode bool - // localhostNodePorts indicates whether we allow NodePort services to be accessed // via localhost. localhostNodePorts bool @@ -742,17 +731,6 @@ func isServiceChainName(chainString string) bool { return false } -// Assumes proxier.mu is held. -func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string) []string { - // Not printing these comments, can reduce size of iptables (in case of large - // number of endpoints) even by 40%+. So if total number of endpoint chains - // is large enough, we simply drop those comments. - if proxier.largeClusterMode { - return args - } - return append(args, "-m", "comment", "--comment", svcName) -} - // Called by the iptables.Monitor, and in response to topology changes; this calls // syncProxyRules() and tells it to resync all services, regardless of whether the // Service or Endpoints/EndpointSlice objects themselves have changed @@ -940,7 +918,6 @@ func (proxier *Proxier) syncProxyRules() { for svcName := range proxier.svcPortMap { totalEndpoints += len(proxier.endpointsMap[svcName]) } - proxier.largeClusterMode = (totalEndpoints > largeClusterEndpointsThreshold) // These two variables are used to publish the sync_proxy_rules_no_endpoints_total // metric. @@ -1372,7 +1349,7 @@ func (proxier *Proxier) syncProxyRules() { activeNATChains[endpointChain] = true args = append(args[:0], "-A", string(endpointChain)) - args = proxier.appendServiceCommentLocked(args, svcPortNameString) + args = append(args, "-m", "comment", "--comment", svcPortNameString) // Handle traffic that loops back to the originator with SNAT. natRules.Write( args, @@ -1388,37 +1365,31 @@ func (proxier *Proxier) syncProxyRules() { } } - // Delete chains no longer in use. Since "iptables-save" can take several seconds - // to run on hosts with lots of iptables rules, we don't bother to do this on - // every sync in large clusters. (Stale chains will not be referenced by any - // active rules, so they're harmless other than taking up memory.) + // Delete chains no longer in use. deletedChains := 0 - if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod { - var existingNATChains map[utiliptables.Chain]struct{} + var existingNATChains map[utiliptables.Chain]struct{} - proxier.iptablesData.Reset() - if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil { - existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes()) + proxier.iptablesData.Reset() + if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil { + existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes()) - for chain := range existingNATChains { - if !activeNATChains[chain] { - chainString := string(chain) - if !isServiceChainName(chainString) { - // Ignore chains that aren't ours. - continue - } - // We must (as per iptables) write a chain-line - // for it, which has the nice effect of flushing - // the chain. Then we can remove the chain. - proxier.natChains.Write(utiliptables.MakeChainLine(chain)) - proxier.natRules.Write("-X", chainString) - deletedChains++ + for chain := range existingNATChains { + if !activeNATChains[chain] { + chainString := string(chain) + if !isServiceChainName(chainString) { + // Ignore chains that aren't ours. + continue } + // We must (as per iptables) write a chain-line + // for it, which has the nice effect of flushing + // the chain. Then we can remove the chain. + proxier.natChains.Write(utiliptables.MakeChainLine(chain)) + proxier.natRules.Write("-X", chainString) + deletedChains++ } - proxier.lastIPTablesCleanup = time.Now() - } else { - klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted") } + } else { + klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted") } // Finally, tail-call to the nodePorts chain. This needs to be after all @@ -1579,7 +1550,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe args = append(args[:0], "-A", string(svcChain), ) - args = proxier.appendServiceCommentLocked(args, comment) + args = append(args, "-m", "comment", "--comment", comment) args = append(args, "-m", "recent", "--name", string(epInfo.ChainName), "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", @@ -1599,7 +1570,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String()) args = append(args[:0], "-A", string(svcChain)) - args = proxier.appendServiceCommentLocked(args, comment) + args = append(args, "-m", "comment", "--comment", comment) if i < (numEndpoints - 1) { // Each rule is a probabilistic match. args = append(args, diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index a8ee582ea58..05cd376a041 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -5943,208 +5943,6 @@ func TestInternalExternalMasquerade(t *testing.T) { } } -func countEndpointsAndComments(iptablesData string, matchEndpoint string) (string, int, int) { - var numEndpoints, numComments int - var matched string - for _, line := range strings.Split(iptablesData, "\n") { - if strings.HasPrefix(line, "-A KUBE-SEP-") && strings.Contains(line, "-j DNAT") { - numEndpoints++ - if strings.Contains(line, "--comment") { - numComments++ - } - if strings.Contains(line, matchEndpoint) { - matched = line - } - } - } - return matched, numEndpoints, numComments -} - -func TestSyncProxyRulesLargeClusterMode(t *testing.T) { - ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) - fp.masqueradeAll = true - fp.syncPeriod = 30 * time.Second - - makeServiceMap(fp, - makeTestService("ns1", "svc1", func(svc *v1.Service) { - svc.Spec.Type = v1.ServiceTypeClusterIP - svc.Spec.ClusterIP = "172.30.0.41" - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p80", - Port: 80, - Protocol: v1.ProtocolTCP, - }} - }), - makeTestService("ns2", "svc2", func(svc *v1.Service) { - svc.Spec.Type = v1.ServiceTypeClusterIP - svc.Spec.ClusterIP = "172.30.0.42" - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p8080", - Port: 8080, - Protocol: v1.ProtocolTCP, - }} - }), - makeTestService("ns3", "svc3", func(svc *v1.Service) { - svc.Spec.Type = v1.ServiceTypeClusterIP - svc.Spec.ClusterIP = "172.30.0.43" - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p8081", - Port: 8081, - Protocol: v1.ProtocolTCP, - }} - }), - ) - - populateEndpointSlices(fp, - makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { - eps.AddressType = discovery.AddressTypeIPv4 - eps.Endpoints = make([]discovery.Endpoint, largeClusterEndpointsThreshold/2-1) - for i := range eps.Endpoints { - eps.Endpoints[i].Addresses = []string{fmt.Sprintf("10.0.%d.%d", i%256, i/256)} - } - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To("p80"), - Port: ptr.To[int32](80), - Protocol: ptr.To(v1.ProtocolTCP), - }} - }), - makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) { - eps.AddressType = discovery.AddressTypeIPv4 - eps.Endpoints = make([]discovery.Endpoint, largeClusterEndpointsThreshold/2-1) - for i := range eps.Endpoints { - eps.Endpoints[i].Addresses = []string{fmt.Sprintf("10.1.%d.%d", i%256, i/256)} - } - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To("p8080"), - Port: ptr.To[int32](8080), - Protocol: ptr.To(v1.ProtocolTCP), - }} - }), - ) - - fp.syncProxyRules() - expectedEndpoints := 2 * (largeClusterEndpointsThreshold/2 - 1) - - firstEndpoint, numEndpoints, numComments := countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0") - assert.Equal(t, "-A KUBE-SEP-DKGQUZGBKLTPAR56 -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.0.0:80", firstEndpoint) - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints) - } - if numComments != numEndpoints { - t.Errorf("numComments (%d) != numEndpoints (%d) when numEndpoints < threshold (%d)", numComments, numEndpoints, largeClusterEndpointsThreshold) - } - - fp.OnEndpointSliceAdd(makeTestEndpointSlice("ns3", "svc3", 1, func(eps *discovery.EndpointSlice) { - eps.AddressType = discovery.AddressTypeIPv4 - eps.Endpoints = []discovery.Endpoint{{ - Addresses: []string{"203.0.113.4"}, - }, { - Addresses: []string{"203.0.113.8"}, - }, { - Addresses: []string{"203.0.113.12"}, - }} - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To("p8081"), - Port: ptr.To[int32](8081), - Protocol: ptr.To(v1.ProtocolTCP), - }} - })) - fp.syncProxyRules() - - firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "203.0.113.4") - assert.Equal(t, "-A KUBE-SEP-RUVVH7YV3PHQBDOS -m tcp -p tcp -j DNAT --to-destination 203.0.113.4:8081", firstEndpoint) - // syncProxyRules will only have output the endpoints for svc3, since the others - // didn't change (and syncProxyRules doesn't automatically do a full resync when you - // cross the largeClusterEndpointsThreshold). - if numEndpoints != 3 { - t.Errorf("Found wrong number of endpoints on partial resync: expected %d, got %d", 3, numEndpoints) - } - if numComments != 0 { - t.Errorf("numComments (%d) != 0 after partial resync when numEndpoints (%d) > threshold (%d)", numComments, expectedEndpoints+3, largeClusterEndpointsThreshold) - } - - // Now force a full resync and confirm that it rewrites the older services with - // no comments as well. - fp.forceSyncProxyRules() - expectedEndpoints += 3 - - firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0") - assert.Equal(t, "-A KUBE-SEP-DKGQUZGBKLTPAR56 -m tcp -p tcp -j DNAT --to-destination 10.0.0.0:80", firstEndpoint) - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints) - } - if numComments != 0 { - t.Errorf("numComments (%d) != 0 when numEndpoints (%d) > threshold (%d)", numComments, numEndpoints, largeClusterEndpointsThreshold) - } - - // Now test service deletion; we have to create another service to do this though, - // because if we deleted any of the existing services, we'd fall back out of large - // cluster mode. - svc4 := makeTestService("ns4", "svc4", func(svc *v1.Service) { - svc.Spec.Type = v1.ServiceTypeClusterIP - svc.Spec.ClusterIP = "172.30.0.44" - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p8082", - Port: 8082, - Protocol: v1.ProtocolTCP, - }} - }) - fp.OnServiceAdd(svc4) - fp.OnEndpointSliceAdd(makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) { - eps.AddressType = discovery.AddressTypeIPv4 - eps.Endpoints = []discovery.Endpoint{{ - Addresses: []string{"10.4.0.1"}, - }} - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To("p8082"), - Port: ptr.To[int32](8082), - Protocol: ptr.To(v1.ProtocolTCP), - }} - })) - fp.syncProxyRules() - - svc4Endpoint, numEndpoints, _ := countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") - assert.Equal(t, "-A KUBE-SEP-SU5STNODRYEWJAUF -m tcp -p tcp -j DNAT --to-destination 10.4.0.1:8082", svc4Endpoint, "svc4 endpoint was not created") - // should only sync svc4 - if numEndpoints != 1 { - t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", 1, numEndpoints) - } - - // In large-cluster mode, if we delete a service, it will not re-sync its chains - // but it will not delete them immediately either. - fp.lastIPTablesCleanup = time.Now() - fp.OnServiceDelete(svc4) - fp.syncProxyRules() - - svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") - assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!") - // should only sync svc4, and shouldn't output its endpoints - if numEndpoints != 0 { - t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", 0, numEndpoints) - } - assert.NotContains(t, fp.iptablesData.String(), "-X ", "iptables data unexpectedly contains chain deletions") - - // But resyncing after a long-enough delay will delete the stale chains - fp.lastIPTablesCleanup = time.Now().Add(-fp.syncPeriod).Add(-1) - fp.syncProxyRules() - - svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") - assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!") - if numEndpoints != 0 { - t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", 0, numEndpoints) - } - assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SVC-EBDQOQU5SJFXRIL3", "iptables data does not contain chain deletion") - assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SEP-SU5STNODRYEWJAUF", "iptables data does not contain endpoint deletions") - - // force a full sync and count - fp.forceSyncProxyRules() - _, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0") - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints) - } -} - // Test calling syncProxyRules() multiple times with various changes func TestSyncProxyRulesRepeated(t *testing.T) { ipt := iptablestest.NewFake()