proxy/iptables: Don't rewrite chains that haven't changed

iptables-restore requires that if you change any rule in a chain, you
have to rewrite the entire chain. But if you avoid mentioning a chain
at all, it will leave it untouched. Take advantage of this by not
rewriting the SVC, SVL, EXT, FW, and SEP chains for services that have
not changed since the last sync, which should drastically cut down on
the size of each iptables-restore in large clusters.
This commit is contained in:
Dan Winship 2022-04-06 08:32:30 -04:00
parent c437b15441
commit ab326d2f4e
8 changed files with 253 additions and 148 deletions

View File

@ -567,6 +567,13 @@ const (
// Enable MinDomains in Pod Topology Spread.
MinDomainsInPodTopologySpread featuregate.Feature = "MinDomainsInPodTopologySpread"
// owner: @danwinship
// kep: http://kep.k8s.io/3453
// alpha: v1.26
//
// Enables new performance-improving code in kube-proxy iptables mode
MinimizeIPTablesRestore featuregate.Feature = "MinimizeIPTablesRestore"
// owner: @janosi @bridgetkromhout
// kep: http://kep.k8s.io/1435
// alpha: v1.20
@ -1030,6 +1037,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
MinDomainsInPodTopologySpread: {Default: false, PreRelease: featuregate.Beta},
MinimizeIPTablesRestore: {Default: false, PreRelease: featuregate.Alpha},
MixedProtocolLBService: {Default: true, PreRelease: featuregate.Beta},
MultiCIDRRangeAllocator: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -302,6 +302,24 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
return changeNeeded
}
// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointChangeTracker) PendingChanges() sets.String {
if ect.endpointSliceCache != nil {
return ect.endpointSliceCache.pendingChanges()
}
ect.lock.Lock()
defer ect.lock.Unlock()
changes := sets.NewString()
for name := range ect.items {
changes.Insert(name.String())
}
return changes
}
// checkoutChanges returns a list of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {

View File

@ -825,6 +825,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints []ServiceEndpoint
expectedStaleServiceNames map[ServicePortName]bool
expectedHealthchecks map[types.NamespacedName]int
expectedChangedEndpoints sets.String
}{{
name: "empty",
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{},
@ -832,6 +833,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, unnamed port",
previousEndpoints: []*v1.Endpoints{
@ -853,6 +855,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, named port, local",
previousEndpoints: []*v1.Endpoints{
@ -876,6 +879,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple subsets",
previousEndpoints: []*v1.Endpoints{
@ -903,6 +907,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple subsets, multiple ports, local",
previousEndpoints: []*v1.Endpoints{
@ -938,6 +943,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple endpoints, subsets, IPs, and ports",
previousEndpoints: []*v1.Endpoints{
@ -1006,6 +1012,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "add an Endpoints",
previousEndpoints: []*v1.Endpoints{
@ -1027,6 +1034,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove an Endpoints",
previousEndpoints: []*v1.Endpoints{
@ -1047,6 +1055,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "add an IP and port",
previousEndpoints: []*v1.Endpoints{
@ -1077,6 +1086,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove an IP and port",
previousEndpoints: []*v1.Endpoints{
@ -1112,6 +1122,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "add a subset",
previousEndpoints: []*v1.Endpoints{
@ -1140,6 +1151,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove a subset",
previousEndpoints: []*v1.Endpoints{
@ -1167,6 +1179,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "rename a port",
previousEndpoints: []*v1.Endpoints{
@ -1192,7 +1205,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleServiceNames: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "renumber a port",
previousEndpoints: []*v1.Endpoints{
@ -1217,6 +1231,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "complex add and remove",
previousEndpoints: []*v1.Endpoints{
@ -1292,6 +1307,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"),
}, {
name: "change from 0 endpoint address to 1 unnamed port",
previousEndpoints: []*v1.Endpoints{
@ -1310,7 +1326,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleServiceNames: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
},
}
@ -1346,6 +1363,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
fp.updateEndpoints(prev, curr)
}
}
pendingChanges := fp.endpointsChanges.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.List(), pendingChanges.List())
}
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMapsStr(t, newMap, tc.expectedResult)
@ -1520,13 +1543,14 @@ func TestEndpointSliceUpdate(t *testing.T) {
fqdnSlice.AddressType = discovery.AddressTypeFQDN
testCases := map[string]struct {
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
expectedChangedEndpoints sets.String
}{
// test starting from an empty state
"add a simple slice that doesn't already exist": {
@ -1548,30 +1572,33 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test no modification to state - current change should be nil as nothing changes
"add the same slice that already exists": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// ensure that only valide address types are processed
"add an FQDN slice (invalid address type)": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// test additions to existing state
"add a slice that overlaps with existing state": {
@ -1604,6 +1631,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test additions to existing state with partially overlapping slices and ports
"add a slice that overlaps with existing state and partial ports": {
@ -1634,6 +1662,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test deletions from existing state with partially overlapping slices and ports
"remove a slice that overlaps with existing state": {
@ -1656,6 +1685,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// ensure a removal that has no effect turns into a no-op
"remove a slice that doesn't even exist in current state": {
@ -1663,12 +1693,13 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// start with all endpoints ready, transition to no endpoints ready
"transition all endpoints to unready state": {
@ -1692,6 +1723,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: true, Ready: false, Serving: false, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with no endpoints ready, transition to all endpoints ready
"transition all endpoints to ready state": {
@ -1713,6 +1745,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with some endpoints ready, transition to more endpoints ready
"transition some endpoints to ready state": {
@ -1741,6 +1774,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with some endpoints ready, transition to some terminating
"transition some endpoints to terminating state": {
@ -1769,6 +1803,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: true},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
}
@ -1783,6 +1818,12 @@ func TestEndpointSliceUpdate(t *testing.T) {
if tc.endpointChangeTracker.items == nil {
t.Errorf("Expected ect.items to not be nil")
}
pendingChanges := tc.endpointChangeTracker.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.List(), pendingChanges.List())
}
changes := tc.endpointChangeTracker.checkoutChanges()
if tc.expectedCurrentChange == nil {
if len(changes) != 0 {

View File

@ -188,6 +188,21 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
return changed
}
// pendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time checkoutChanges was called
func (cache *EndpointSliceCache) pendingChanges() sets.String {
cache.lock.Lock()
defer cache.lock.Unlock()
changes := sets.NewString()
for serviceNN, esTracker := range cache.trackerByServiceMap {
if len(esTracker.pending) > 0 {
changes.Insert(serviceNN.String())
}
}
return changes
}
// checkoutChanges returns a list of all endpointsChanges that are
// pending and then marks them as applied.
func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {

View File

@ -38,9 +38,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
@ -163,6 +165,7 @@ type Proxier struct {
// updating iptables with some partial data after kube-proxy restart.
endpointSlicesSynced bool
servicesSynced bool
needFullSync bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
syncPeriod time.Duration
@ -298,7 +301,7 @@ func NewProxier(ipt utiliptables.Interface,
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
proxier.syncProxyRules, syncPeriod, wait.NeverStop)
proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)
if ipt.HasRandomFully() {
klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
@ -539,7 +542,7 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
proxier.forceSyncProxyRules()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
@ -575,7 +578,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
proxier.forceSyncProxyRules()
}
// OnNodeAdd is called whenever creation of new node object
@ -596,6 +599,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.needFullSync = true
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
@ -620,6 +624,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.needFullSync = true
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
@ -636,6 +641,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.Sync()
@ -769,6 +775,17 @@ func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string
return append(args, "-m", "comment", "--comment", svcName)
}
// Called by the iptables.Monitor, and in response to topology changes; this calls
// syncProxyRules() and tells it to resync all services, regardless of whether the
// Service or Endpoints/EndpointSlice objects themselves have changed
func (proxier *Proxier) forceSyncProxyRules() {
proxier.mu.Lock()
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.syncProxyRules()
}
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
@ -789,9 +806,12 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
}()
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
tryPartialSync := !proxier.needFullSync && utilfeature.DefaultFeatureGate.Enabled(features.MinimizeIPTablesRestore)
var serviceChanged, endpointsChanged sets.String
if tryPartialSync {
serviceChanged = proxier.serviceChanges.PendingChanges()
endpointsChanged = proxier.endpointsChanges.PendingChanges()
}
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
@ -826,6 +846,10 @@ func (proxier *Proxier) syncProxyRules() {
if !success {
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
// proxier.serviceChanges and proxier.endpointChanges have already
// been flushed, so we've lost the state needed to be able to do
// a partial sync.
proxier.needFullSync = true
}
}()
@ -1184,6 +1208,13 @@ func (proxier *Proxier) syncProxyRules() {
)
}
// If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync
// then we can omit them from the restore input. (We have already marked
// them in activeNATChains, so they won't get deleted.)
if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) {
continue
}
// Set up internal traffic handling.
if hasInternalEndpoints {
args = append(args[:0],
@ -1479,6 +1510,7 @@ func (proxier *Proxier) syncProxyRules() {
return
}
success = true
proxier.needFullSync = false
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {

View File

@ -7491,6 +7491,8 @@ func countEndpointsAndComments(iptablesData string, matchEndpoint string) (strin
}
func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MinimizeIPTablesRestore, true)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.masqueradeAll = true
@ -7582,6 +7584,22 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
}}
}))
fp.syncProxyRules()
firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "203.0.113.4")
assert.Equal(t, "-A KUBE-SEP-RUVVH7YV3PHQBDOS -m tcp -p tcp -j DNAT --to-destination 203.0.113.4:8081", firstEndpoint)
// syncProxyRules will only have output the endpoints for svc3, since the others
// didn't change (and syncProxyRules doesn't automatically do a full resync when you
// cross the largeClusterEndpointsThreshold).
if numEndpoints != 3 {
t.Errorf("Found wrong number of endpoints on partial resync: expected %d, got %d", 3, numEndpoints)
}
if numComments != 0 {
t.Errorf("numComments (%d) != 0 after partial resync when numEndpoints (%d) > threshold (%d)", numComments, expectedEndpoints+3, largeClusterEndpointsThreshold)
}
// Now force a full resync and confirm that it rewrites the older services with
// no comments as well.
fp.forceSyncProxyRules()
expectedEndpoints += 3
firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0")
@ -7618,12 +7636,12 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
}}
}))
fp.syncProxyRules()
expectedEndpoints += 1
svc4Endpoint, numEndpoints, _ := countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "-A KUBE-SEP-SU5STNODRYEWJAUF -m tcp -p tcp -j DNAT --to-destination 10.4.0.1:8082", svc4Endpoint, "svc4 endpoint was not created")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", expectedEndpoints, numEndpoints)
// should only sync svc4
if numEndpoints != 1 {
t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", 1, numEndpoints)
}
// In large-cluster mode, if we delete a service, it will not re-sync its chains
@ -7631,12 +7649,12 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
fp.lastIPTablesCleanup = time.Now()
fp.OnServiceDelete(svc4)
fp.syncProxyRules()
expectedEndpoints -= 1
svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", expectedEndpoints, numEndpoints)
// should only sync svc4, and shouldn't output its endpoints
if numEndpoints != 0 {
t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", 0, numEndpoints)
}
assert.NotContains(t, fp.iptablesData.String(), "-X ", "iptables data unexpectedly contains chain deletions")
@ -7646,15 +7664,24 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", expectedEndpoints, numEndpoints)
if numEndpoints != 0 {
t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", 0, numEndpoints)
}
assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SVC-EBDQOQU5SJFXRIL3", "iptables data does not contain chain deletion")
assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SEP-SU5STNODRYEWJAUF", "iptables data does not contain endpoint deletions")
// force a full sync and count
fp.forceSyncProxyRules()
_, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints)
}
}
// Test calling syncProxyRules() multiple times with various changes
func TestSyncProxyRulesRepeated(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MinimizeIPTablesRestore, true)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
@ -7750,7 +7777,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
// Add a new service and its endpoints
// Add a new service and its endpoints. (This will only sync the SVC and SEP rules
// for the new service, not the existing ones.)
makeServiceMap(fp,
makeTestService("ns3", "svc3", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@ -7796,11 +7824,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SEP-UHEGFW77JX3KXTOV - [0:0]
:KUBE-SVC-2VJB64SDSIJUP5T6 - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns2/svc2:p8080 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 8080 -j KUBE-SVC-2VJB64SDSIJUP5T6
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
@ -7811,21 +7835,13 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SEP-UHEGFW77JX3KXTOV -m comment --comment ns2/svc2:p8080 -s 10.0.2.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-UHEGFW77JX3KXTOV -m comment --comment ns2/svc2:p8080 -m tcp -p tcp -j DNAT --to-destination 10.0.2.1:8080
-A KUBE-SVC-2VJB64SDSIJUP5T6 -m comment --comment "ns2/svc2:p8080 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 8080 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-2VJB64SDSIJUP5T6 -m comment --comment "ns2/svc2:p8080 -> 10.0.2.1:8080" -j KUBE-SEP-UHEGFW77JX3KXTOV
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Delete a service
// Delete a service. (Won't update the other services.)
fp.OnServiceDelete(svc2)
fp.syncProxyRules()
@ -7845,12 +7861,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SEP-UHEGFW77JX3KXTOV - [0:0]
:KUBE-SVC-2VJB64SDSIJUP5T6 - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
@ -7858,21 +7870,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
-X KUBE-SEP-UHEGFW77JX3KXTOV
-X KUBE-SVC-2VJB64SDSIJUP5T6
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Add a service, sync, then add its endpoints
// Add a service, sync, then add its endpoints. (The first sync will be a no-op other
// than adding the REJECT rule. The second sync will create the new service.)
makeServiceMap(fp,
makeTestService("ns4", "svc4", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@ -7902,10 +7907,6 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
@ -7913,17 +7914,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
populateEndpointSlices(fp,
makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) {
@ -7956,11 +7949,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-4SW47YFZTEDKD3PK - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
@ -7971,21 +7960,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Change an endpoint of an existing service
// Change an endpoint of an existing service. This will cause its SVC and SEP
// chains to be rewritten.
eps3update := eps3.DeepCopy()
eps3update.Endpoints[0].Addresses[0] = "10.0.3.2"
fp.OnEndpointSliceUpdate(eps3, eps3update)
@ -8007,13 +7989,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-DKCFIS26GWF2WLWC - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-4SW47YFZTEDKD3PK - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
@ -8022,24 +8000,16 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -j KUBE-SEP-DKCFIS26GWF2WLWC
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
-X KUBE-SEP-BSWRHOQ77KEXZLNL
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Add an endpoint to a service
// Add an endpoint to a service. This will cause its SVC and SEP chains to be rewritten.
eps3update2 := eps3update.DeepCopy()
eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{"10.0.3.3"}})
fp.OnEndpointSliceUpdate(eps3update, eps3update2)
@ -8061,13 +8031,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0]
:KUBE-SEP-DKCFIS26GWF2WLWC - [0:0]
:KUBE-SEP-JVVZVJ7BSEPPRNBS - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-4SW47YFZTEDKD3PK - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
@ -8076,26 +8042,18 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Sync with no new changes...
// Sync with no new changes... This will not rewrite any SVC or SEP chains
fp.syncProxyRules()
expected = dedent.Dedent(`
@ -8114,13 +8072,6 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0]
:KUBE-SEP-DKCFIS26GWF2WLWC - [0:0]
:KUBE-SEP-JVVZVJ7BSEPPRNBS - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-4SW47YFZTEDKD3PK - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
@ -8129,24 +8080,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
}
func TestNoEndpointsMetric(t *testing.T) {

View File

@ -327,6 +327,20 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
return len(sct.items) > 0
}
// PendingChanges returns a set whose keys are the names of the services that have changed
// since the last time sct was used to update a ServiceMap. (You must call this _before_
// calling sm.Update(sct).)
func (sct *ServiceChangeTracker) PendingChanges() sets.String {
sct.lock.Lock()
defer sct.lock.Unlock()
changes := sets.NewString()
for name := range sct.items {
changes.Insert(name.String())
}
return changes
}
// UpdateServiceMapResult is the updated results after applying service changes.
type UpdateServiceMapResult struct {
// HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node.

View File

@ -564,6 +564,10 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
)
// Headless service should be ignored
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
@ -591,6 +595,10 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
}),
)
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
@ -651,6 +659,18 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
for i := range services {
fp.addService(services[i])
}
pending := fp.serviceChanges.PendingChanges()
for i := range services {
name := services[i].Namespace + "/" + services[i].Name
if !pending.Has(name) {
t.Errorf("expected pending change for %q", name)
}
}
if pending.Len() != len(services) {
t.Errorf("expected %d pending service changes, got %d", len(services), pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 8 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
@ -684,6 +704,10 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
fp.deleteService(services[2])
fp.deleteService(services[3])
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 4 {
t.Errorf("expected 4 pending service changes, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
@ -733,6 +757,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
fp.addService(servicev1)
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
@ -747,6 +775,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// Change service to load-balancer
fp.updateService(servicev1, servicev2)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
@ -761,6 +793,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// No change; make sure the service map stays the same and there are
// no health-check changes
fp.updateService(servicev2, servicev2)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
@ -774,6 +810,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// And back to ClusterIP
fp.updateService(servicev2, servicev1)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)