diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index d9da0304dd4..951b2a860ea 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -567,6 +567,13 @@ const ( // Enable MinDomains in Pod Topology Spread. MinDomainsInPodTopologySpread featuregate.Feature = "MinDomainsInPodTopologySpread" + // owner: @danwinship + // kep: http://kep.k8s.io/3453 + // alpha: v1.26 + // + // Enables new performance-improving code in kube-proxy iptables mode + MinimizeIPTablesRestore featuregate.Feature = "MinimizeIPTablesRestore" + // owner: @janosi @bridgetkromhout // kep: http://kep.k8s.io/1435 // alpha: v1.20 @@ -1030,6 +1037,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS MinDomainsInPodTopologySpread: {Default: false, PreRelease: featuregate.Beta}, + MinimizeIPTablesRestore: {Default: false, PreRelease: featuregate.Alpha}, + MixedProtocolLBService: {Default: true, PreRelease: featuregate.Beta}, MultiCIDRRangeAllocator: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 1969fb4b21f..ca7071a9eb6 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -302,6 +302,24 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E return changeNeeded } +// PendingChanges returns a set whose keys are the names of the services whose endpoints +// have changed since the last time ect was used to update an EndpointsMap. (You must call +// this _before_ calling em.Update(ect).) +func (ect *EndpointChangeTracker) PendingChanges() sets.String { + if ect.endpointSliceCache != nil { + return ect.endpointSliceCache.pendingChanges() + } + + ect.lock.Lock() + defer ect.lock.Unlock() + + changes := sets.NewString() + for name := range ect.items { + changes.Insert(name.String()) + } + return changes +} + // checkoutChanges returns a list of pending endpointsChanges and marks them as // applied. func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 4e900ba85ae..660a3a2efe7 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -825,6 +825,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleEndpoints []ServiceEndpoint expectedStaleServiceNames map[ServicePortName]bool expectedHealthchecks map[types.NamespacedName]int + expectedChangedEndpoints sets.String }{{ name: "empty", oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, @@ -832,6 +833,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, unnamed port", previousEndpoints: []*v1.Endpoints{ @@ -853,6 +855,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, named port, local", previousEndpoints: []*v1.Endpoints{ @@ -876,6 +879,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple subsets", previousEndpoints: []*v1.Endpoints{ @@ -903,6 +907,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple subsets, multiple ports, local", previousEndpoints: []*v1.Endpoints{ @@ -938,6 +943,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple endpoints, subsets, IPs, and ports", previousEndpoints: []*v1.Endpoints{ @@ -1006,6 +1012,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, + expectedChangedEndpoints: sets.NewString(), }, { name: "add an Endpoints", previousEndpoints: []*v1.Endpoints{ @@ -1027,6 +1034,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "remove an Endpoints", previousEndpoints: []*v1.Endpoints{ @@ -1047,6 +1055,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add an IP and port", previousEndpoints: []*v1.Endpoints{ @@ -1077,6 +1086,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "remove an IP and port", previousEndpoints: []*v1.Endpoints{ @@ -1112,6 +1122,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add a subset", previousEndpoints: []*v1.Endpoints{ @@ -1140,6 +1151,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "remove a subset", previousEndpoints: []*v1.Endpoints{ @@ -1167,6 +1179,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "rename a port", previousEndpoints: []*v1.Endpoints{ @@ -1192,7 +1205,8 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "renumber a port", previousEndpoints: []*v1.Endpoints{ @@ -1217,6 +1231,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "complex add and remove", previousEndpoints: []*v1.Endpoints{ @@ -1292,6 +1307,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, + expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"), }, { name: "change from 0 endpoint address to 1 unnamed port", previousEndpoints: []*v1.Endpoints{ @@ -1310,7 +1326,8 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, } @@ -1346,6 +1363,12 @@ func TestUpdateEndpointsMap(t *testing.T) { fp.updateEndpoints(prev, curr) } } + + pendingChanges := fp.endpointsChanges.PendingChanges() + if !pendingChanges.Equal(tc.expectedChangedEndpoints) { + t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.List(), pendingChanges.List()) + } + result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMapsStr(t, newMap, tc.expectedResult) @@ -1520,13 +1543,14 @@ func TestEndpointSliceUpdate(t *testing.T) { fqdnSlice.AddressType = discovery.AddressTypeFQDN testCases := map[string]struct { - startingSlices []*discovery.EndpointSlice - endpointChangeTracker *EndpointChangeTracker - namespacedName types.NamespacedName - paramEndpointSlice *discovery.EndpointSlice - paramRemoveSlice bool - expectedReturnVal bool - expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo + startingSlices []*discovery.EndpointSlice + endpointChangeTracker *EndpointChangeTracker + namespacedName types.NamespacedName + paramEndpointSlice *discovery.EndpointSlice + paramRemoveSlice bool + expectedReturnVal bool + expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo + expectedChangedEndpoints sets.String }{ // test starting from an empty state "add a simple slice that doesn't already exist": { @@ -1548,30 +1572,33 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // test no modification to state - current change should be nil as nothing changes "add the same slice that already exists": { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: false, - expectedCurrentChange: nil, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: false, + expectedCurrentChange: nil, + expectedChangedEndpoints: sets.NewString(), }, // ensure that only valide address types are processed "add an FQDN slice (invalid address type)": { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: fqdnSlice, - paramRemoveSlice: false, - expectedReturnVal: false, - expectedCurrentChange: nil, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: fqdnSlice, + paramRemoveSlice: false, + expectedReturnVal: false, + expectedCurrentChange: nil, + expectedChangedEndpoints: sets.NewString(), }, // test additions to existing state "add a slice that overlaps with existing state": { @@ -1604,6 +1631,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // test additions to existing state with partially overlapping slices and ports "add a slice that overlaps with existing state and partial ports": { @@ -1634,6 +1662,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // test deletions from existing state with partially overlapping slices and ports "remove a slice that overlaps with existing state": { @@ -1656,6 +1685,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // ensure a removal that has no effect turns into a no-op "remove a slice that doesn't even exist in current state": { @@ -1663,12 +1693,13 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: true, - expectedReturnVal: false, - expectedCurrentChange: nil, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: true, + expectedReturnVal: false, + expectedCurrentChange: nil, + expectedChangedEndpoints: sets.NewString(), }, // start with all endpoints ready, transition to no endpoints ready "transition all endpoints to unready state": { @@ -1692,6 +1723,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: true, Ready: false, Serving: false, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // start with no endpoints ready, transition to all endpoints ready "transition all endpoints to ready state": { @@ -1713,6 +1745,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // start with some endpoints ready, transition to more endpoints ready "transition some endpoints to ready state": { @@ -1741,6 +1774,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // start with some endpoints ready, transition to some terminating "transition some endpoints to terminating state": { @@ -1769,6 +1803,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: true}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, } @@ -1783,6 +1818,12 @@ func TestEndpointSliceUpdate(t *testing.T) { if tc.endpointChangeTracker.items == nil { t.Errorf("Expected ect.items to not be nil") } + + pendingChanges := tc.endpointChangeTracker.PendingChanges() + if !pendingChanges.Equal(tc.expectedChangedEndpoints) { + t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.List(), pendingChanges.List()) + } + changes := tc.endpointChangeTracker.checkoutChanges() if tc.expectedCurrentChange == nil { if len(changes) != 0 { diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index b4e52e23e87..6f00982d4e2 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -188,6 +188,21 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint return changed } +// pendingChanges returns a set whose keys are the names of the services whose endpoints +// have changed since the last time checkoutChanges was called +func (cache *EndpointSliceCache) pendingChanges() sets.String { + cache.lock.Lock() + defer cache.lock.Unlock() + + changes := sets.NewString() + for serviceNN, esTracker := range cache.trackerByServiceMap { + if len(esTracker.pending) > 0 { + changes.Insert(serviceNN.String()) + } + } + return changes +} + // checkoutChanges returns a list of all endpointsChanges that are // pending and then marks them as applied. func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange { diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1cc9dfb5055..5ba6d8bb534 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -38,9 +38,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" @@ -163,6 +165,7 @@ type Proxier struct { // updating iptables with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool + needFullSync bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncPeriod time.Duration @@ -298,7 +301,7 @@ func NewProxier(ipt utiliptables.Interface, proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, - proxier.syncProxyRules, syncPeriod, wait.NeverStop) + proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop) if ipt.HasRandomFully() { klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol()) @@ -539,7 +542,7 @@ func (proxier *Proxier) OnServiceSynced() { proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. - proxier.syncProxyRules() + proxier.forceSyncProxyRules() } // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object @@ -575,7 +578,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. - proxier.syncProxyRules() + proxier.forceSyncProxyRules() } // OnNodeAdd is called whenever creation of new node object @@ -596,6 +599,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) { for k, v := range node.Labels { proxier.nodeLabels[k] = v } + proxier.needFullSync = true proxier.mu.Unlock() klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels) @@ -620,6 +624,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { for k, v := range node.Labels { proxier.nodeLabels[k] = v } + proxier.needFullSync = true proxier.mu.Unlock() klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels) @@ -636,6 +641,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) { } proxier.mu.Lock() proxier.nodeLabels = nil + proxier.needFullSync = true proxier.mu.Unlock() proxier.Sync() @@ -769,6 +775,17 @@ func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string return append(args, "-m", "comment", "--comment", svcName) } +// Called by the iptables.Monitor, and in response to topology changes; this calls +// syncProxyRules() and tells it to resync all services, regardless of whether the +// Service or Endpoints/EndpointSlice objects themselves have changed +func (proxier *Proxier) forceSyncProxyRules() { + proxier.mu.Lock() + proxier.needFullSync = true + proxier.mu.Unlock() + + proxier.syncProxyRules() +} + // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // This assumes proxier.mu is NOT held @@ -789,9 +806,12 @@ func (proxier *Proxier) syncProxyRules() { klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start)) }() - // We assume that if this was called, we really want to sync them, - // even if nothing changed in the meantime. In other words, callers are - // responsible for detecting no-op changes and not calling this function. + tryPartialSync := !proxier.needFullSync && utilfeature.DefaultFeatureGate.Enabled(features.MinimizeIPTablesRestore) + var serviceChanged, endpointsChanged sets.String + if tryPartialSync { + serviceChanged = proxier.serviceChanges.PendingChanges() + endpointsChanged = proxier.endpointsChanges.PendingChanges() + } serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) @@ -826,6 +846,10 @@ func (proxier *Proxier) syncProxyRules() { if !success { klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod) proxier.syncRunner.RetryAfter(proxier.syncPeriod) + // proxier.serviceChanges and proxier.endpointChanges have already + // been flushed, so we've lost the state needed to be able to do + // a partial sync. + proxier.needFullSync = true } }() @@ -1184,6 +1208,13 @@ func (proxier *Proxier) syncProxyRules() { ) } + // If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync + // then we can omit them from the restore input. (We have already marked + // them in activeNATChains, so they won't get deleted.) + if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) { + continue + } + // Set up internal traffic handling. if hasInternalEndpoints { args = append(args[:0], @@ -1479,6 +1510,7 @@ func (proxier *Proxier) syncProxyRules() { return } success = true + proxier.needFullSync = false for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { for _, lastChangeTriggerTime := range lastChangeTriggerTimes { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 92f0d655336..5c16d359e67 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -7491,6 +7491,8 @@ func countEndpointsAndComments(iptablesData string, matchEndpoint string) (strin } func TestSyncProxyRulesLargeClusterMode(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MinimizeIPTablesRestore, true)() + ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) fp.masqueradeAll = true @@ -7582,6 +7584,22 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { }} })) fp.syncProxyRules() + + firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "203.0.113.4") + assert.Equal(t, "-A KUBE-SEP-RUVVH7YV3PHQBDOS -m tcp -p tcp -j DNAT --to-destination 203.0.113.4:8081", firstEndpoint) + // syncProxyRules will only have output the endpoints for svc3, since the others + // didn't change (and syncProxyRules doesn't automatically do a full resync when you + // cross the largeClusterEndpointsThreshold). + if numEndpoints != 3 { + t.Errorf("Found wrong number of endpoints on partial resync: expected %d, got %d", 3, numEndpoints) + } + if numComments != 0 { + t.Errorf("numComments (%d) != 0 after partial resync when numEndpoints (%d) > threshold (%d)", numComments, expectedEndpoints+3, largeClusterEndpointsThreshold) + } + + // Now force a full resync and confirm that it rewrites the older services with + // no comments as well. + fp.forceSyncProxyRules() expectedEndpoints += 3 firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0") @@ -7618,12 +7636,12 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { }} })) fp.syncProxyRules() - expectedEndpoints += 1 svc4Endpoint, numEndpoints, _ := countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") assert.Equal(t, "-A KUBE-SEP-SU5STNODRYEWJAUF -m tcp -p tcp -j DNAT --to-destination 10.4.0.1:8082", svc4Endpoint, "svc4 endpoint was not created") - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", expectedEndpoints, numEndpoints) + // should only sync svc4 + if numEndpoints != 1 { + t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", 1, numEndpoints) } // In large-cluster mode, if we delete a service, it will not re-sync its chains @@ -7631,12 +7649,12 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { fp.lastIPTablesCleanup = time.Now() fp.OnServiceDelete(svc4) fp.syncProxyRules() - expectedEndpoints -= 1 svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!") - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", expectedEndpoints, numEndpoints) + // should only sync svc4, and shouldn't output its endpoints + if numEndpoints != 0 { + t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", 0, numEndpoints) } assert.NotContains(t, fp.iptablesData.String(), "-X ", "iptables data unexpectedly contains chain deletions") @@ -7646,15 +7664,24 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!") - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", expectedEndpoints, numEndpoints) + if numEndpoints != 0 { + t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", 0, numEndpoints) } assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SVC-EBDQOQU5SJFXRIL3", "iptables data does not contain chain deletion") assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SEP-SU5STNODRYEWJAUF", "iptables data does not contain endpoint deletions") + + // force a full sync and count + fp.forceSyncProxyRules() + _, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0") + if numEndpoints != expectedEndpoints { + t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints) + } } // Test calling syncProxyRules() multiple times with various changes func TestSyncProxyRulesRepeated(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MinimizeIPTablesRestore, true)() + ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) @@ -7750,7 +7777,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) { `) assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) - // Add a new service and its endpoints + // Add a new service and its endpoints. (This will only sync the SVC and SEP rules + // for the new service, not the existing ones.) makeServiceMap(fp, makeTestService("ns3", "svc3", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP @@ -7796,11 +7824,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SEP-UHEGFW77JX3KXTOV - [0:0] - :KUBE-SVC-2VJB64SDSIJUP5T6 - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns2/svc2:p8080 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 8080 -j KUBE-SVC-2VJB64SDSIJUP5T6 -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK @@ -7811,21 +7835,13 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SEP-UHEGFW77JX3KXTOV -m comment --comment ns2/svc2:p8080 -s 10.0.2.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-UHEGFW77JX3KXTOV -m comment --comment ns2/svc2:p8080 -m tcp -p tcp -j DNAT --to-destination 10.0.2.1:8080 - -A KUBE-SVC-2VJB64SDSIJUP5T6 -m comment --comment "ns2/svc2:p8080 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 8080 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-2VJB64SDSIJUP5T6 -m comment --comment "ns2/svc2:p8080 -> 10.0.2.1:8080" -j KUBE-SEP-UHEGFW77JX3KXTOV -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Delete a service + // Delete a service. (Won't update the other services.) fp.OnServiceDelete(svc2) fp.syncProxyRules() @@ -7845,12 +7861,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] :KUBE-SEP-UHEGFW77JX3KXTOV - [0:0] :KUBE-SVC-2VJB64SDSIJUP5T6 - [0:0] - :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS @@ -7858,21 +7870,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO -X KUBE-SEP-UHEGFW77JX3KXTOV -X KUBE-SVC-2VJB64SDSIJUP5T6 COMMIT `) - assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Add a service, sync, then add its endpoints + // Add a service, sync, then add its endpoints. (The first sync will be a no-op other + // than adding the REJECT rule. The second sync will create the new service.) makeServiceMap(fp, makeTestService("ns4", "svc4", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP @@ -7902,10 +7907,6 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS @@ -7913,17 +7914,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) populateEndpointSlices(fp, makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) { @@ -7956,11 +7949,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0] - :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] :KUBE-SVC-4SW47YFZTEDKD3PK - [0:0] - :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK @@ -7971,21 +7960,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80 - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Change an endpoint of an existing service + // Change an endpoint of an existing service. This will cause its SVC and SEP + // chains to be rewritten. eps3update := eps3.DeepCopy() eps3update.Endpoints[0].Addresses[0] = "10.0.3.2" fp.OnEndpointSliceUpdate(eps3, eps3update) @@ -8007,13 +7989,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0] :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] :KUBE-SEP-DKCFIS26GWF2WLWC - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SVC-4SW47YFZTEDKD3PK - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK @@ -8022,24 +8000,16 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80 -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -j KUBE-SEP-DKCFIS26GWF2WLWC - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO -X KUBE-SEP-BSWRHOQ77KEXZLNL COMMIT `) - assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Add an endpoint to a service + // Add an endpoint to a service. This will cause its SVC and SEP chains to be rewritten. eps3update2 := eps3update.DeepCopy() eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{"10.0.3.3"}}) fp.OnEndpointSliceUpdate(eps3update, eps3update2) @@ -8061,13 +8031,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0] :KUBE-SEP-DKCFIS26GWF2WLWC - [0:0] :KUBE-SEP-JVVZVJ7BSEPPRNBS - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SVC-4SW47YFZTEDKD3PK - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK @@ -8076,26 +8042,18 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80 -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80 -A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ -A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Sync with no new changes... + // Sync with no new changes... This will not rewrite any SVC or SEP chains fp.syncProxyRules() expected = dedent.Dedent(` @@ -8114,13 +8072,6 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0] - :KUBE-SEP-DKCFIS26GWF2WLWC - [0:0] - :KUBE-SEP-JVVZVJ7BSEPPRNBS - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SVC-4SW47YFZTEDKD3PK - [0:0] - :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK @@ -8129,24 +8080,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80 - -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ - -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80 - -A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ - -A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) } func TestNoEndpointsMetric(t *testing.T) { diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 2346565ed25..6e4c8203cff 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -327,6 +327,20 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { return len(sct.items) > 0 } +// PendingChanges returns a set whose keys are the names of the services that have changed +// since the last time sct was used to update a ServiceMap. (You must call this _before_ +// calling sm.Update(sct).) +func (sct *ServiceChangeTracker) PendingChanges() sets.String { + sct.lock.Lock() + defer sct.lock.Unlock() + + changes := sets.NewString() + for name := range sct.items { + changes.Insert(name.String()) + } + return changes +} + // UpdateServiceMapResult is the updated results after applying service changes. type UpdateServiceMapResult struct { // HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node. diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index 0f35b5dc386..f7959446d2c 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -564,6 +564,10 @@ func TestServiceMapUpdateHeadless(t *testing.T) { ) // Headless service should be ignored + pending := fp.serviceChanges.PendingChanges() + if pending.Len() != 0 { + t.Errorf("expected 0 pending service changes, got %d", pending.Len()) + } result := fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) @@ -591,6 +595,10 @@ func TestUpdateServiceTypeExternalName(t *testing.T) { }), ) + pending := fp.serviceChanges.PendingChanges() + if pending.Len() != 0 { + t.Errorf("expected 0 pending service changes, got %d", pending.Len()) + } result := fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.serviceMap) @@ -651,6 +659,18 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.addService(services[i]) } + + pending := fp.serviceChanges.PendingChanges() + for i := range services { + name := services[i].Namespace + "/" + services[i].Name + if !pending.Has(name) { + t.Errorf("expected pending change for %q", name) + } + } + if pending.Len() != len(services) { + t.Errorf("expected %d pending service changes, got %d", len(services), pending.Len()) + } + result := fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 8 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) @@ -684,6 +704,10 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.deleteService(services[2]) fp.deleteService(services[3]) + pending = fp.serviceChanges.PendingChanges() + if pending.Len() != 4 { + t.Errorf("expected 4 pending service changes, got %d", pending.Len()) + } result = fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.serviceMap) @@ -733,6 +757,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.addService(servicev1) + pending := fp.serviceChanges.PendingChanges() + if pending.Len() != 1 { + t.Errorf("expected 1 pending service change, got %d", pending.Len()) + } result := fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) @@ -747,6 +775,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // Change service to load-balancer fp.updateService(servicev1, servicev2) + pending = fp.serviceChanges.PendingChanges() + if pending.Len() != 1 { + t.Errorf("expected 1 pending service change, got %d", pending.Len()) + } result = fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) @@ -761,6 +793,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes fp.updateService(servicev2, servicev2) + pending = fp.serviceChanges.PendingChanges() + if pending.Len() != 0 { + t.Errorf("expected 0 pending service changes, got %d", pending.Len()) + } result = fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) @@ -774,6 +810,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // And back to ClusterIP fp.updateService(servicev2, servicev1) + pending = fp.serviceChanges.PendingChanges() + if pending.Len() != 1 { + t.Errorf("expected 1 pending service change, got %d", pending.Len()) + } result = fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap)