diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index b2e0323c6ea..409a2c9f567 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -557,6 +557,13 @@ const ( // Enable MinDomains in Pod Topology Spread. MinDomainsInPodTopologySpread featuregate.Feature = "MinDomainsInPodTopologySpread" + // owner: @danwinship + // kep: http://kep.k8s.io/3453 + // alpha: v1.26 + // + // Enables new performance-improving code in kube-proxy iptables mode + MinimizeIPTablesRestore featuregate.Feature = "MinimizeIPTablesRestore" + // owner: @janosi @bridgetkromhout // kep: https://kep.k8s.io/1435 // alpha: v1.20 @@ -959,6 +966,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS MinDomainsInPodTopologySpread: {Default: false, PreRelease: featuregate.Beta}, + MinimizeIPTablesRestore: {Default: false, PreRelease: featuregate.Alpha}, + MixedProtocolLBService: {Default: true, PreRelease: featuregate.Beta}, MultiCIDRRangeAllocator: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 1969fb4b21f..ca7071a9eb6 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -302,6 +302,24 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E return changeNeeded } +// PendingChanges returns a set whose keys are the names of the services whose endpoints +// have changed since the last time ect was used to update an EndpointsMap. (You must call +// this _before_ calling em.Update(ect).) +func (ect *EndpointChangeTracker) PendingChanges() sets.String { + if ect.endpointSliceCache != nil { + return ect.endpointSliceCache.pendingChanges() + } + + ect.lock.Lock() + defer ect.lock.Unlock() + + changes := sets.NewString() + for name := range ect.items { + changes.Insert(name.String()) + } + return changes +} + // checkoutChanges returns a list of pending endpointsChanges and marks them as // applied. func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 4e900ba85ae..660a3a2efe7 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -825,6 +825,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleEndpoints []ServiceEndpoint expectedStaleServiceNames map[ServicePortName]bool expectedHealthchecks map[types.NamespacedName]int + expectedChangedEndpoints sets.String }{{ name: "empty", oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, @@ -832,6 +833,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, unnamed port", previousEndpoints: []*v1.Endpoints{ @@ -853,6 +855,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, named port, local", previousEndpoints: []*v1.Endpoints{ @@ -876,6 +879,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple subsets", previousEndpoints: []*v1.Endpoints{ @@ -903,6 +907,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple subsets, multiple ports, local", previousEndpoints: []*v1.Endpoints{ @@ -938,6 +943,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple endpoints, subsets, IPs, and ports", previousEndpoints: []*v1.Endpoints{ @@ -1006,6 +1012,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, + expectedChangedEndpoints: sets.NewString(), }, { name: "add an Endpoints", previousEndpoints: []*v1.Endpoints{ @@ -1027,6 +1034,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "remove an Endpoints", previousEndpoints: []*v1.Endpoints{ @@ -1047,6 +1055,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add an IP and port", previousEndpoints: []*v1.Endpoints{ @@ -1077,6 +1086,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "remove an IP and port", previousEndpoints: []*v1.Endpoints{ @@ -1112,6 +1122,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add a subset", previousEndpoints: []*v1.Endpoints{ @@ -1140,6 +1151,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "remove a subset", previousEndpoints: []*v1.Endpoints{ @@ -1167,6 +1179,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "rename a port", previousEndpoints: []*v1.Endpoints{ @@ -1192,7 +1205,8 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "renumber a port", previousEndpoints: []*v1.Endpoints{ @@ -1217,6 +1231,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "complex add and remove", previousEndpoints: []*v1.Endpoints{ @@ -1292,6 +1307,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, + expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"), }, { name: "change from 0 endpoint address to 1 unnamed port", previousEndpoints: []*v1.Endpoints{ @@ -1310,7 +1326,8 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedHealthchecks: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, } @@ -1346,6 +1363,12 @@ func TestUpdateEndpointsMap(t *testing.T) { fp.updateEndpoints(prev, curr) } } + + pendingChanges := fp.endpointsChanges.PendingChanges() + if !pendingChanges.Equal(tc.expectedChangedEndpoints) { + t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.List(), pendingChanges.List()) + } + result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMapsStr(t, newMap, tc.expectedResult) @@ -1520,13 +1543,14 @@ func TestEndpointSliceUpdate(t *testing.T) { fqdnSlice.AddressType = discovery.AddressTypeFQDN testCases := map[string]struct { - startingSlices []*discovery.EndpointSlice - endpointChangeTracker *EndpointChangeTracker - namespacedName types.NamespacedName - paramEndpointSlice *discovery.EndpointSlice - paramRemoveSlice bool - expectedReturnVal bool - expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo + startingSlices []*discovery.EndpointSlice + endpointChangeTracker *EndpointChangeTracker + namespacedName types.NamespacedName + paramEndpointSlice *discovery.EndpointSlice + paramRemoveSlice bool + expectedReturnVal bool + expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo + expectedChangedEndpoints sets.String }{ // test starting from an empty state "add a simple slice that doesn't already exist": { @@ -1548,30 +1572,33 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // test no modification to state - current change should be nil as nothing changes "add the same slice that already exists": { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: false, - expectedCurrentChange: nil, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: false, + expectedCurrentChange: nil, + expectedChangedEndpoints: sets.NewString(), }, // ensure that only valide address types are processed "add an FQDN slice (invalid address type)": { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: fqdnSlice, - paramRemoveSlice: false, - expectedReturnVal: false, - expectedCurrentChange: nil, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: fqdnSlice, + paramRemoveSlice: false, + expectedReturnVal: false, + expectedCurrentChange: nil, + expectedChangedEndpoints: sets.NewString(), }, // test additions to existing state "add a slice that overlaps with existing state": { @@ -1604,6 +1631,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // test additions to existing state with partially overlapping slices and ports "add a slice that overlaps with existing state and partial ports": { @@ -1634,6 +1662,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // test deletions from existing state with partially overlapping slices and ports "remove a slice that overlaps with existing state": { @@ -1656,6 +1685,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // ensure a removal that has no effect turns into a no-op "remove a slice that doesn't even exist in current state": { @@ -1663,12 +1693,13 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: true, - expectedReturnVal: false, - expectedCurrentChange: nil, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: true, + expectedReturnVal: false, + expectedCurrentChange: nil, + expectedChangedEndpoints: sets.NewString(), }, // start with all endpoints ready, transition to no endpoints ready "transition all endpoints to unready state": { @@ -1692,6 +1723,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: true, Ready: false, Serving: false, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // start with no endpoints ready, transition to all endpoints ready "transition all endpoints to ready state": { @@ -1713,6 +1745,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // start with some endpoints ready, transition to more endpoints ready "transition some endpoints to ready state": { @@ -1741,6 +1774,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: false}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, // start with some endpoints ready, transition to some terminating "transition some endpoints to terminating state": { @@ -1769,6 +1803,7 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: true}, }, }, + expectedChangedEndpoints: sets.NewString("ns1/svc1"), }, } @@ -1783,6 +1818,12 @@ func TestEndpointSliceUpdate(t *testing.T) { if tc.endpointChangeTracker.items == nil { t.Errorf("Expected ect.items to not be nil") } + + pendingChanges := tc.endpointChangeTracker.PendingChanges() + if !pendingChanges.Equal(tc.expectedChangedEndpoints) { + t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.List(), pendingChanges.List()) + } + changes := tc.endpointChangeTracker.checkoutChanges() if tc.expectedCurrentChange == nil { if len(changes) != 0 { diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index b4e52e23e87..6f00982d4e2 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -188,6 +188,21 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint return changed } +// pendingChanges returns a set whose keys are the names of the services whose endpoints +// have changed since the last time checkoutChanges was called +func (cache *EndpointSliceCache) pendingChanges() sets.String { + cache.lock.Lock() + defer cache.lock.Unlock() + + changes := sets.NewString() + for serviceNN, esTracker := range cache.trackerByServiceMap { + if len(esTracker.pending) > 0 { + changes.Insert(serviceNN.String()) + } + } + return changes +} + // checkoutChanges returns a list of all endpointsChanges that are // pending and then marks them as applied. func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange { diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1cc9dfb5055..1946e73c940 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -38,9 +38,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" @@ -163,6 +165,7 @@ type Proxier struct { // updating iptables with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool + needFullSync bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncPeriod time.Duration @@ -298,7 +301,7 @@ func NewProxier(ipt utiliptables.Interface, proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, - proxier.syncProxyRules, syncPeriod, wait.NeverStop) + proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop) if ipt.HasRandomFully() { klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol()) @@ -539,7 +542,7 @@ func (proxier *Proxier) OnServiceSynced() { proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. - proxier.syncProxyRules() + proxier.forceSyncProxyRules() } // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object @@ -575,7 +578,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. - proxier.syncProxyRules() + proxier.forceSyncProxyRules() } // OnNodeAdd is called whenever creation of new node object @@ -596,6 +599,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) { for k, v := range node.Labels { proxier.nodeLabels[k] = v } + proxier.needFullSync = true proxier.mu.Unlock() klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels) @@ -620,6 +624,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { for k, v := range node.Labels { proxier.nodeLabels[k] = v } + proxier.needFullSync = true proxier.mu.Unlock() klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels) @@ -636,6 +641,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) { } proxier.mu.Lock() proxier.nodeLabels = nil + proxier.needFullSync = true proxier.mu.Unlock() proxier.Sync() @@ -769,6 +775,17 @@ func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string return append(args, "-m", "comment", "--comment", svcName) } +// Called by the iptables.Monitor, and in response to topology changes; this calls +// syncProxyRules() and tells it to resync all services, regardless of whether the +// Service or Endpoints/EndpointSlice objects themselves have changed +func (proxier *Proxier) forceSyncProxyRules() { + proxier.mu.Lock() + proxier.needFullSync = true + proxier.mu.Unlock() + + proxier.syncProxyRules() +} + // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // This assumes proxier.mu is NOT held @@ -789,9 +806,12 @@ func (proxier *Proxier) syncProxyRules() { klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start)) }() - // We assume that if this was called, we really want to sync them, - // even if nothing changed in the meantime. In other words, callers are - // responsible for detecting no-op changes and not calling this function. + tryPartialSync := !proxier.needFullSync && utilfeature.DefaultFeatureGate.Enabled(features.MinimizeIPTablesRestore) + var serviceChanged, endpointsChanged sets.String + if tryPartialSync { + serviceChanged = proxier.serviceChanges.PendingChanges() + endpointsChanged = proxier.endpointsChanges.PendingChanges() + } serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) @@ -826,6 +846,13 @@ func (proxier *Proxier) syncProxyRules() { if !success { klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod) proxier.syncRunner.RetryAfter(proxier.syncPeriod) + if tryPartialSync { + metrics.IptablesPartialRestoreFailuresTotal.Inc() + } + // proxier.serviceChanges and proxier.endpointChanges have already + // been flushed, so we've lost the state needed to be able to do + // a partial sync. + proxier.needFullSync = true } }() @@ -1184,6 +1211,13 @@ func (proxier *Proxier) syncProxyRules() { ) } + // If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync + // then we can omit them from the restore input. (We have already marked + // them in activeNATChains, so they won't get deleted.) + if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) { + continue + } + // Set up internal traffic handling. if hasInternalEndpoints { args = append(args[:0], @@ -1479,6 +1513,7 @@ func (proxier *Proxier) syncProxyRules() { return } success = true + proxier.needFullSync = false for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { for _, lastChangeTriggerTime := range lastChangeTriggerTimes { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 4f2a194ae23..38d5f2acd1e 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1343,8 +1343,10 @@ func getLine() int { } // assertIPTablesRulesEqual asserts that the generated rules in result match the rules in -// expected, ignoring irrelevant ordering differences. -func assertIPTablesRulesEqual(t *testing.T, line int, expected, result string) { +// expected, ignoring irrelevant ordering differences. By default this also checks the +// rules for consistency (eg, no jumps to chains that aren't defined), but that can be +// disabled by passing false for checkConsistency if you are passing a partial set of rules. +func assertIPTablesRulesEqual(t *testing.T, line int, checkConsistency bool, expected, result string) { expected = strings.TrimLeft(expected, " \t\n") result, err := sortIPTablesRules(strings.TrimLeft(result, " \t\n")) @@ -1360,9 +1362,11 @@ func assertIPTablesRulesEqual(t *testing.T, line int, expected, result string) { t.Errorf("rules do not match%s:\ndiff:\n%s\nfull result:\n```\n%s```", lineStr, diff, result) } - err = checkIPTablesRuleJumps(expected) - if err != nil { - t.Fatalf("%s", err) + if checkConsistency { + err = checkIPTablesRuleJumps(expected) + if err != nil { + t.Fatalf("%s%s", err, lineStr) + } } } @@ -2037,7 +2041,7 @@ func TestOverallIPTablesRulesWithMultipleServices(t *testing.T) { COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) natRulesMetric, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT))) if err != nil { @@ -2099,7 +2103,7 @@ func TestClusterIPReject(t *testing.T) { COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -2182,7 +2186,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.0.1:80" -j KUBE-SEP-SXIVWICOYRO3J4NJ COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -2303,7 +2307,7 @@ func TestLoadBalancer(t *testing.T) { COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -2491,7 +2495,7 @@ func TestNodePort(t *testing.T) { -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.0.1:80" -j KUBE-SEP-SXIVWICOYRO3J4NJ COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -2586,7 +2590,7 @@ func TestHealthCheckNodePort(t *testing.T) { COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -2648,7 +2652,7 @@ func TestMasqueradeRule(t *testing.T) { } else { expected = fmt.Sprintf(expectedFmt, "") } - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) } } @@ -2704,7 +2708,7 @@ func TestExternalIPsReject(t *testing.T) { -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -2813,7 +2817,7 @@ func TestOnlyLocalExternalIPs(t *testing.T) { -A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.2.1:80" -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -2921,7 +2925,7 @@ func TestNonLocalExternalIPs(t *testing.T) { -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.2.1:80" -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -2994,7 +2998,7 @@ func TestNodePortReject(t *testing.T) { -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -3086,7 +3090,7 @@ func TestLoadBalancerReject(t *testing.T) { -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -3219,7 +3223,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { -A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.2.1:80" -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{ { @@ -3395,7 +3399,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable fp.syncProxyRules() - assertIPTablesRulesEqual(t, line, expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, line, true, expected, fp.iptablesData.String()) runPacketFlowTests(t, line, ipt, testNodeIP, []packetFlowTest{ { @@ -4742,7 +4746,7 @@ func TestEndpointSliceE2E(t *testing.T) { fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() - assertIPTablesRulesEqual(t, getLine(), expectedIPTablesWithSlice, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expectedIPTablesWithSlice, fp.iptablesData.String()) fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() @@ -5345,7 +5349,7 @@ func TestInternalTrafficPolicyE2E(t *testing.T) { fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() - assertIPTablesRulesEqual(t, tc.line, tc.expectedIPTablesWithSlice, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, tc.line, true, tc.expectedIPTablesWithSlice, fp.iptablesData.String()) runPacketFlowTests(t, tc.line, ipt, testNodeIP, tc.flowTests) fp.OnEndpointSliceDelete(endpointSlice) @@ -6122,14 +6126,14 @@ func TestEndpointSliceWithTerminatingEndpointsTrafficPolicyLocal(t *testing.T) { fp.OnEndpointSliceAdd(testcase.endpointslice) fp.syncProxyRules() - assertIPTablesRulesEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, testcase.line, true, testcase.expectedIPTables, fp.iptablesData.String()) runPacketFlowTests(t, testcase.line, ipt, testNodeIP, testcase.flowTests) fp.OnEndpointSliceDelete(testcase.endpointslice) fp.syncProxyRules() if testcase.noUsableEndpoints { // Deleting the EndpointSlice should have had no effect - assertIPTablesRulesEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, testcase.line, true, testcase.expectedIPTables, fp.iptablesData.String()) } else { assertIPTablesRulesNotEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String()) } @@ -6868,14 +6872,14 @@ func TestEndpointSliceWithTerminatingEndpointsTrafficPolicyCluster(t *testing.T) fp.OnEndpointSliceAdd(testcase.endpointslice) fp.syncProxyRules() - assertIPTablesRulesEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, testcase.line, true, testcase.expectedIPTables, fp.iptablesData.String()) runPacketFlowTests(t, testcase.line, ipt, testNodeIP, testcase.flowTests) fp.OnEndpointSliceDelete(testcase.endpointslice) fp.syncProxyRules() if testcase.noUsableEndpoints { // Deleting the EndpointSlice should have had no effect - assertIPTablesRulesEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, testcase.line, true, testcase.expectedIPTables, fp.iptablesData.String()) } else { assertIPTablesRulesNotEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String()) } @@ -7487,6 +7491,8 @@ func countEndpointsAndComments(iptablesData string, matchEndpoint string) (strin } func TestSyncProxyRulesLargeClusterMode(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MinimizeIPTablesRestore, true)() + ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) fp.masqueradeAll = true @@ -7578,6 +7584,22 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { }} })) fp.syncProxyRules() + + firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "203.0.113.4") + assert.Equal(t, "-A KUBE-SEP-RUVVH7YV3PHQBDOS -m tcp -p tcp -j DNAT --to-destination 203.0.113.4:8081", firstEndpoint) + // syncProxyRules will only have output the endpoints for svc3, since the others + // didn't change (and syncProxyRules doesn't automatically do a full resync when you + // cross the largeClusterEndpointsThreshold). + if numEndpoints != 3 { + t.Errorf("Found wrong number of endpoints on partial resync: expected %d, got %d", 3, numEndpoints) + } + if numComments != 0 { + t.Errorf("numComments (%d) != 0 after partial resync when numEndpoints (%d) > threshold (%d)", numComments, expectedEndpoints+3, largeClusterEndpointsThreshold) + } + + // Now force a full resync and confirm that it rewrites the older services with + // no comments as well. + fp.forceSyncProxyRules() expectedEndpoints += 3 firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0") @@ -7614,12 +7636,12 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { }} })) fp.syncProxyRules() - expectedEndpoints += 1 svc4Endpoint, numEndpoints, _ := countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") assert.Equal(t, "-A KUBE-SEP-SU5STNODRYEWJAUF -m tcp -p tcp -j DNAT --to-destination 10.4.0.1:8082", svc4Endpoint, "svc4 endpoint was not created") - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", expectedEndpoints, numEndpoints) + // should only sync svc4 + if numEndpoints != 1 { + t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", 1, numEndpoints) } // In large-cluster mode, if we delete a service, it will not re-sync its chains @@ -7627,12 +7649,12 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { fp.lastIPTablesCleanup = time.Now() fp.OnServiceDelete(svc4) fp.syncProxyRules() - expectedEndpoints -= 1 svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!") - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", expectedEndpoints, numEndpoints) + // should only sync svc4, and shouldn't output its endpoints + if numEndpoints != 0 { + t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", 0, numEndpoints) } assert.NotContains(t, fp.iptablesData.String(), "-X ", "iptables data unexpectedly contains chain deletions") @@ -7642,17 +7664,27 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) { svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1") assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!") - if numEndpoints != expectedEndpoints { - t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", expectedEndpoints, numEndpoints) + if numEndpoints != 0 { + t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", 0, numEndpoints) } assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SVC-EBDQOQU5SJFXRIL3", "iptables data does not contain chain deletion") assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SEP-SU5STNODRYEWJAUF", "iptables data does not contain endpoint deletions") + + // force a full sync and count + fp.forceSyncProxyRules() + _, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0") + if numEndpoints != expectedEndpoints { + t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints) + } } // Test calling syncProxyRules() multiple times with various changes func TestSyncProxyRulesRepeated(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MinimizeIPTablesRestore, true)() + ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) + metrics.RegisterMetrics() // Create initial state var svc2 *v1.Service @@ -7744,9 +7776,10 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) - // Add a new service and its endpoints + // Add a new service and its endpoints. (This will only sync the SVC and SEP rules + // for the new service, not the existing ones.) makeServiceMap(fp, makeTestService("ns3", "svc3", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP @@ -7792,11 +7825,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SEP-UHEGFW77JX3KXTOV - [0:0] - :KUBE-SVC-2VJB64SDSIJUP5T6 - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns2/svc2:p8080 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 8080 -j KUBE-SVC-2VJB64SDSIJUP5T6 -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK @@ -7807,21 +7836,13 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SEP-UHEGFW77JX3KXTOV -m comment --comment ns2/svc2:p8080 -s 10.0.2.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-UHEGFW77JX3KXTOV -m comment --comment ns2/svc2:p8080 -m tcp -p tcp -j DNAT --to-destination 10.0.2.1:8080 - -A KUBE-SVC-2VJB64SDSIJUP5T6 -m comment --comment "ns2/svc2:p8080 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 8080 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-2VJB64SDSIJUP5T6 -m comment --comment "ns2/svc2:p8080 -> 10.0.2.1:8080" -j KUBE-SEP-UHEGFW77JX3KXTOV -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Delete a service + // Delete a service. (Won't update the other services.) fp.OnServiceDelete(svc2) fp.syncProxyRules() @@ -7841,12 +7862,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] :KUBE-SEP-UHEGFW77JX3KXTOV - [0:0] :KUBE-SVC-2VJB64SDSIJUP5T6 - [0:0] - :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS @@ -7854,23 +7871,18 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO -X KUBE-SEP-UHEGFW77JX3KXTOV -X KUBE-SVC-2VJB64SDSIJUP5T6 COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Add a service, sync, then add its endpoints + // Add a service, sync, then add its endpoints. (The first sync will be a no-op other + // than adding the REJECT rule. The second sync will create the new service.) + var svc4 *v1.Service makeServiceMap(fp, makeTestService("ns4", "svc4", func(svc *v1.Service) { + svc4 = svc svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.30.0.44" svc.Spec.Ports = []v1.ServicePort{{ @@ -7898,10 +7910,6 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS @@ -7909,17 +7917,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) populateEndpointSlices(fp, makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) { @@ -7952,11 +7952,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0] - :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] :KUBE-SVC-4SW47YFZTEDKD3PK - [0:0] - :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK @@ -7967,21 +7963,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80 - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Change an endpoint of an existing service + // Change an endpoint of an existing service. This will cause its SVC and SEP + // chains to be rewritten. eps3update := eps3.DeepCopy() eps3update.Endpoints[0].Addresses[0] = "10.0.3.2" fp.OnEndpointSliceUpdate(eps3, eps3update) @@ -8003,13 +7992,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0] :KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0] :KUBE-SEP-DKCFIS26GWF2WLWC - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SVC-4SW47YFZTEDKD3PK - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK @@ -8018,24 +8003,16 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80 -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -j KUBE-SEP-DKCFIS26GWF2WLWC - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO -X KUBE-SEP-BSWRHOQ77KEXZLNL COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Add an endpoint to a service + // Add an endpoint to a service. This will cause its SVC and SEP chains to be rewritten. eps3update2 := eps3update.DeepCopy() eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{"10.0.3.3"}}) fp.OnEndpointSliceUpdate(eps3update, eps3update2) @@ -8057,13 +8034,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-POSTROUTING - [0:0] - :KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0] :KUBE-SEP-DKCFIS26GWF2WLWC - [0:0] :KUBE-SEP-JVVZVJ7BSEPPRNBS - [0:0] - :KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0] - :KUBE-SVC-4SW47YFZTEDKD3PK - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] - :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK @@ -8072,26 +8045,95 @@ func TestSyncProxyRulesRepeated(t *testing.T) { -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80 -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80 -A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ -A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80 - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) - // Sync with no new changes... + // Sync with no new changes... This will not rewrite any SVC or SEP chains + fp.syncProxyRules() + + expected = dedent.Dedent(` + *filter + :KUBE-NODEPORTS - [0:0] + :KUBE-SERVICES - [0:0] + :KUBE-EXTERNAL-SERVICES - [0:0] + :KUBE-FORWARD - [0:0] + :KUBE-PROXY-FIREWALL - [0:0] + -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT + -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT + COMMIT + *nat + :KUBE-NODEPORTS - [0:0] + :KUBE-SERVICES - [0:0] + :KUBE-MARK-MASQ - [0:0] + :KUBE-POSTROUTING - [0:0] + -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O + -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK + -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK + -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS + -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 + -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN + -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 + -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE + COMMIT + `) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) + + // Now force a partial resync error and ensure that it recovers correctly + if fp.needFullSync { + t.Fatalf("Proxier unexpectedly already needs a full sync?") + } + prFailures, err := testutil.GetCounterMetricValue(metrics.IptablesPartialRestoreFailuresTotal) + if err != nil { + t.Fatalf("Could not get partial restore failures metric: %v", err) + } + if prFailures != 0.0 { + t.Errorf("Already did a partial resync? Something failed earlier!") + } + + // Add a rule jumping from svc3's service chain to svc4's endpoint, then try to + // delete svc4. This will fail because the partial resync won't rewrite svc3's + // rules and so the partial restore would leave a dangling jump from there to + // svc4's endpoint. The proxier will then queue a full resync in response to the + // partial resync failure, and the full resync will succeed (since it will rewrite + // svc3's rules as well). + // + // This is an absurd scenario, but it has to be; partial resync failures are + // supposed to be impossible; if we knew of any non-absurd scenario that would + // cause such a failure, then that would be a bug and we would fix it. + if _, err := fp.iptables.ChainExists(utiliptables.TableNAT, utiliptables.Chain("KUBE-SEP-AYCN5HPXMIRJNJXU")); err != nil { + t.Fatalf("svc4's endpoint chain unexpected already does not exist!") + } + if _, err := fp.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.Chain("KUBE-SVC-X27LE4BHSL4DOUIK"), "-j", "KUBE-SEP-AYCN5HPXMIRJNJXU"); err != nil { + t.Fatalf("Could not add bad iptables rule: %v", err) + } + + fp.OnServiceDelete(svc4) + fp.syncProxyRules() + + if _, err := fp.iptables.ChainExists(utiliptables.TableNAT, utiliptables.Chain("KUBE-SEP-AYCN5HPXMIRJNJXU")); err != nil { + t.Errorf("svc4's endpoint chain was successfully deleted despite dangling references!") + } + if !fp.needFullSync { + t.Errorf("Proxier did not fail on previous partial resync?") + } + updatedPRFailures, err := testutil.GetCounterMetricValue(metrics.IptablesPartialRestoreFailuresTotal) + if err != nil { + t.Errorf("Could not get partial restore failures metric: %v", err) + } + if updatedPRFailures != prFailures+1.0 { + t.Errorf("Partial restore failures metric was not incremented after failed partial resync (expected %.02f, got %.02f)", prFailures+1.0, updatedPRFailures) + } + + // On retry we should do a full resync, which should succeed (and delete svc4) fp.syncProxyRules() expected = dedent.Dedent(` @@ -8119,30 +8161,27 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK - -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ - -A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80 -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ -A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80 -A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ -A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80 -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ -A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ - -A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO + -X KUBE-SEP-AYCN5HPXMIRJNJXU + -X KUBE-SVC-4SW47YFZTEDKD3PK COMMIT `) - assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String()) + assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) } func TestNoEndpointsMetric(t *testing.T) { diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index b5601f697d8..27304214d99 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -126,6 +126,17 @@ var ( }, ) + // IptablesPartialRestoreFailuresTotal is the number of iptables *partial* restore + // failures (resulting in a fall back to a full restore) that the proxy has seen. + IptablesPartialRestoreFailuresTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: kubeProxySubsystem, + Name: "sync_proxy_rules_iptables_partial_restore_failures_total", + Help: "Cumulative proxy iptables partial restore failures", + StabilityLevel: metrics.ALPHA, + }, + ) + // IptablesRulesTotal is the number of iptables rules that the iptables proxy installs. IptablesRulesTotal = metrics.NewGaugeVec( &metrics.GaugeOpts{ @@ -177,6 +188,7 @@ func RegisterMetrics() { legacyregistry.MustRegister(ServiceChangesTotal) legacyregistry.MustRegister(IptablesRulesTotal) legacyregistry.MustRegister(IptablesRestoreFailuresTotal) + legacyregistry.MustRegister(IptablesPartialRestoreFailuresTotal) legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp) legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal) }) diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index c3b1f2b8013..e3d39f4c7cf 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -328,6 +328,20 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { return len(sct.items) > 0 } +// PendingChanges returns a set whose keys are the names of the services that have changed +// since the last time sct was used to update a ServiceMap. (You must call this _before_ +// calling sm.Update(sct).) +func (sct *ServiceChangeTracker) PendingChanges() sets.String { + sct.lock.Lock() + defer sct.lock.Unlock() + + changes := sets.NewString() + for name := range sct.items { + changes.Insert(name.String()) + } + return changes +} + // UpdateServiceMapResult is the updated results after applying service changes. type UpdateServiceMapResult struct { // HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node. diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index 0f35b5dc386..f7959446d2c 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -564,6 +564,10 @@ func TestServiceMapUpdateHeadless(t *testing.T) { ) // Headless service should be ignored + pending := fp.serviceChanges.PendingChanges() + if pending.Len() != 0 { + t.Errorf("expected 0 pending service changes, got %d", pending.Len()) + } result := fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) @@ -591,6 +595,10 @@ func TestUpdateServiceTypeExternalName(t *testing.T) { }), ) + pending := fp.serviceChanges.PendingChanges() + if pending.Len() != 0 { + t.Errorf("expected 0 pending service changes, got %d", pending.Len()) + } result := fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.serviceMap) @@ -651,6 +659,18 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.addService(services[i]) } + + pending := fp.serviceChanges.PendingChanges() + for i := range services { + name := services[i].Namespace + "/" + services[i].Name + if !pending.Has(name) { + t.Errorf("expected pending change for %q", name) + } + } + if pending.Len() != len(services) { + t.Errorf("expected %d pending service changes, got %d", len(services), pending.Len()) + } + result := fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 8 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) @@ -684,6 +704,10 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.deleteService(services[2]) fp.deleteService(services[3]) + pending = fp.serviceChanges.PendingChanges() + if pending.Len() != 4 { + t.Errorf("expected 4 pending service changes, got %d", pending.Len()) + } result = fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.serviceMap) @@ -733,6 +757,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.addService(servicev1) + pending := fp.serviceChanges.PendingChanges() + if pending.Len() != 1 { + t.Errorf("expected 1 pending service change, got %d", pending.Len()) + } result := fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) @@ -747,6 +775,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // Change service to load-balancer fp.updateService(servicev1, servicev2) + pending = fp.serviceChanges.PendingChanges() + if pending.Len() != 1 { + t.Errorf("expected 1 pending service change, got %d", pending.Len()) + } result = fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) @@ -761,6 +793,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes fp.updateService(servicev2, servicev2) + pending = fp.serviceChanges.PendingChanges() + if pending.Len() != 0 { + t.Errorf("expected 0 pending service changes, got %d", pending.Len()) + } result = fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) @@ -774,6 +810,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // And back to ClusterIP fp.updateService(servicev2, servicev1) + pending = fp.serviceChanges.PendingChanges() + if pending.Len() != 1 { + t.Errorf("expected 1 pending service change, got %d", pending.Len()) + } result = fp.serviceMap.Update(fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) diff --git a/pkg/util/iptables/testing/fake.go b/pkg/util/iptables/testing/fake.go index 7b134a13cb9..165d364eddf 100644 --- a/pkg/util/iptables/testing/fake.go +++ b/pkg/util/iptables/testing/fake.go @@ -22,6 +22,7 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/util/iptables" ) @@ -217,16 +218,22 @@ func (f *FakeIPTables) SaveInto(table iptables.Table, buffer *bytes.Buffer) erro return f.saveTable(table, buffer) } -func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error { +// This is not a complete list but it's enough to pass the unit tests +var builtinTargets = sets.NewString("ACCEPT", "DROP", "RETURN", "REJECT", "DNAT", "SNAT", "MASQUERADE", "MARK") + +func (f *FakeIPTables) restoreTable(newDump *IPTablesDump, newTable *Table, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error { oldTable, err := f.Dump.GetTable(newTable.Name) if err != nil { return err } + backupChains := make([]Chain, len(oldTable.Chains)) + copy(backupChains, oldTable.Chains) + + // Update internal state if flush == iptables.FlushTables { oldTable.Chains = make([]Chain, 0, len(newTable.Chains)) } - for _, newChain := range newTable.Chains { oldChain, _ := f.Dump.GetChain(newTable.Name, newChain.Name) switch { @@ -235,7 +242,6 @@ func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, c case oldChain == nil && !newChain.Deleted: oldTable.Chains = append(oldTable.Chains, newChain) case oldChain != nil && newChain.Deleted: - // FIXME: should make sure chain is not referenced from other jumps _ = f.DeleteChain(newTable.Name, newChain.Name) case oldChain != nil && !newChain.Deleted: // replace old data with new @@ -246,6 +252,35 @@ func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, c } } } + + // Now check that all old/new jumps are valid + for _, chain := range oldTable.Chains { + for _, rule := range chain.Rules { + if rule.Jump == nil { + continue + } + if builtinTargets.Has(rule.Jump.Value) { + continue + } + + jumpedChain, _ := f.Dump.GetChain(oldTable.Name, iptables.Chain(rule.Jump.Value)) + if jumpedChain == nil { + newChain, _ := newDump.GetChain(oldTable.Name, iptables.Chain(rule.Jump.Value)) + if newChain != nil { + // rule is an old rule that jumped to a chain which + // was deleted by newDump. + oldTable.Chains = backupChains + return fmt.Errorf("deleted chain %q is referenced by existing rules", newChain.Name) + } else { + // rule is a new rule that jumped to a chain that was + // neither created nor pre-existing + oldTable.Chains = backupChains + return fmt.Errorf("rule %q jumps to a non-existent chain", rule.Raw) + } + } + } + } + return nil } @@ -261,7 +296,7 @@ func (f *FakeIPTables) Restore(table iptables.Table, data []byte, flush iptables return err } - return f.restoreTable(newTable, flush, counters) + return f.restoreTable(dump, newTable, flush, counters) } // RestoreAll is part of iptables.Interface @@ -272,7 +307,7 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter } for i := range dump.Tables { - err = f.restoreTable(&dump.Tables[i], flush, counters) + err = f.restoreTable(dump, &dump.Tables[i], flush, counters) if err != nil { return err } diff --git a/pkg/util/iptables/testing/fake_test.go b/pkg/util/iptables/testing/fake_test.go index 933121bef5d..413610a6d81 100644 --- a/pkg/util/iptables/testing/fake_test.go +++ b/pkg/util/iptables/testing/fake_test.go @@ -170,10 +170,12 @@ func TestFakeIPTables(t *testing.T) { *nat :KUBE-RESTORED - [0:0] :KUBE-MISC-CHAIN - [0:0] + :KUBE-MISC-TWO - [0:0] :KUBE-EMPTY - [0:0] -A KUBE-RESTORED -m comment --comment "restored chain" -j ACCEPT - -A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP + -A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO -A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE + -A KUBE-MISC-TWO -j ACCEPT COMMIT `, "\n")) err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.NoRestoreCounters) @@ -196,11 +198,13 @@ func TestFakeIPTables(t *testing.T) { :KUBE-TEST - [0:0] :KUBE-RESTORED - [0:0] :KUBE-MISC-CHAIN - [0:0] + :KUBE-MISC-TWO - [0:0] :KUBE-EMPTY - [0:0] -A KUBE-TEST -j ACCEPT -A KUBE-RESTORED -m comment --comment "restored chain" -j ACCEPT - -A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP + -A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO -A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE + -A KUBE-MISC-TWO -j ACCEPT COMMIT *filter :INPUT - [0:0] @@ -214,6 +218,30 @@ func TestFakeIPTables(t *testing.T) { t.Fatalf("bad post-restore dump. expected:\n%s\n\ngot:\n%s\n", expected, buf.Bytes()) } + // Trying to use Restore to delete a chain that another chain jumps to will fail + rules = dedent.Dedent(strings.Trim(` + *nat + :KUBE-MISC-TWO - [0:0] + -X KUBE-MISC-TWO + COMMIT + `, "\n")) + err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.RestoreCounters) + if err == nil || !strings.Contains(err.Error(), "referenced by existing rules") { + t.Fatalf("Expected 'referenced by existing rules' error from Restore, got %v", err) + } + + // Trying to use Restore to add a jump to a non-existent chain will fail + rules = dedent.Dedent(strings.Trim(` + *nat + :KUBE-MISC-TWO - [0:0] + -A KUBE-MISC-TWO -j KUBE-MISC-THREE + COMMIT + `, "\n")) + err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.RestoreCounters) + if err == nil || !strings.Contains(err.Error(), "non-existent chain") { + t.Fatalf("Expected 'non-existent chain' error from Restore, got %v", err) + } + // more Restore; empty out one chain and delete another, but also update its counters rules = dedent.Dedent(strings.Trim(` *nat @@ -240,9 +268,11 @@ func TestFakeIPTables(t *testing.T) { :POSTROUTING - [0:0] :KUBE-TEST - [99:9999] :KUBE-MISC-CHAIN - [0:0] + :KUBE-MISC-TWO - [0:0] :KUBE-EMPTY - [0:0] - -A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP + -A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO -A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE + -A KUBE-MISC-TWO -j ACCEPT COMMIT *filter :INPUT - [0:0]