Merge pull request #110268 from danwinship/minimize-iptables-changes

minimize iptables-restore input
This commit is contained in:
Kubernetes Prow Robot 2022-11-01 18:06:46 -07:00 committed by GitHub
commit 3edbebe348
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 453 additions and 165 deletions

View File

@ -557,6 +557,13 @@ const (
// Enable MinDomains in Pod Topology Spread.
MinDomainsInPodTopologySpread featuregate.Feature = "MinDomainsInPodTopologySpread"
// owner: @danwinship
// kep: http://kep.k8s.io/3453
// alpha: v1.26
//
// Enables new performance-improving code in kube-proxy iptables mode
MinimizeIPTablesRestore featuregate.Feature = "MinimizeIPTablesRestore"
// owner: @janosi @bridgetkromhout
// kep: https://kep.k8s.io/1435
// alpha: v1.20
@ -959,6 +966,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
MinDomainsInPodTopologySpread: {Default: false, PreRelease: featuregate.Beta},
MinimizeIPTablesRestore: {Default: false, PreRelease: featuregate.Alpha},
MixedProtocolLBService: {Default: true, PreRelease: featuregate.Beta},
MultiCIDRRangeAllocator: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -302,6 +302,24 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
return changeNeeded
}
// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointChangeTracker) PendingChanges() sets.String {
if ect.endpointSliceCache != nil {
return ect.endpointSliceCache.pendingChanges()
}
ect.lock.Lock()
defer ect.lock.Unlock()
changes := sets.NewString()
for name := range ect.items {
changes.Insert(name.String())
}
return changes
}
// checkoutChanges returns a list of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {

View File

@ -825,6 +825,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints []ServiceEndpoint
expectedStaleServiceNames map[ServicePortName]bool
expectedHealthchecks map[types.NamespacedName]int
expectedChangedEndpoints sets.String
}{{
name: "empty",
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{},
@ -832,6 +833,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, unnamed port",
previousEndpoints: []*v1.Endpoints{
@ -853,6 +855,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, named port, local",
previousEndpoints: []*v1.Endpoints{
@ -876,6 +879,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple subsets",
previousEndpoints: []*v1.Endpoints{
@ -903,6 +907,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple subsets, multiple ports, local",
previousEndpoints: []*v1.Endpoints{
@ -938,6 +943,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple endpoints, subsets, IPs, and ports",
previousEndpoints: []*v1.Endpoints{
@ -1006,6 +1012,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "add an Endpoints",
previousEndpoints: []*v1.Endpoints{
@ -1027,6 +1034,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove an Endpoints",
previousEndpoints: []*v1.Endpoints{
@ -1047,6 +1055,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "add an IP and port",
previousEndpoints: []*v1.Endpoints{
@ -1077,6 +1086,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove an IP and port",
previousEndpoints: []*v1.Endpoints{
@ -1112,6 +1122,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "add a subset",
previousEndpoints: []*v1.Endpoints{
@ -1140,6 +1151,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove a subset",
previousEndpoints: []*v1.Endpoints{
@ -1167,6 +1179,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "rename a port",
previousEndpoints: []*v1.Endpoints{
@ -1192,7 +1205,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleServiceNames: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "renumber a port",
previousEndpoints: []*v1.Endpoints{
@ -1217,6 +1231,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "complex add and remove",
previousEndpoints: []*v1.Endpoints{
@ -1292,6 +1307,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"),
}, {
name: "change from 0 endpoint address to 1 unnamed port",
previousEndpoints: []*v1.Endpoints{
@ -1310,7 +1326,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleServiceNames: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
},
}
@ -1346,6 +1363,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
fp.updateEndpoints(prev, curr)
}
}
pendingChanges := fp.endpointsChanges.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.List(), pendingChanges.List())
}
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMapsStr(t, newMap, tc.expectedResult)
@ -1520,13 +1543,14 @@ func TestEndpointSliceUpdate(t *testing.T) {
fqdnSlice.AddressType = discovery.AddressTypeFQDN
testCases := map[string]struct {
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
expectedChangedEndpoints sets.String
}{
// test starting from an empty state
"add a simple slice that doesn't already exist": {
@ -1548,30 +1572,33 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test no modification to state - current change should be nil as nothing changes
"add the same slice that already exists": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// ensure that only valide address types are processed
"add an FQDN slice (invalid address type)": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// test additions to existing state
"add a slice that overlaps with existing state": {
@ -1604,6 +1631,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test additions to existing state with partially overlapping slices and ports
"add a slice that overlaps with existing state and partial ports": {
@ -1634,6 +1662,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test deletions from existing state with partially overlapping slices and ports
"remove a slice that overlaps with existing state": {
@ -1656,6 +1685,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// ensure a removal that has no effect turns into a no-op
"remove a slice that doesn't even exist in current state": {
@ -1663,12 +1693,13 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// start with all endpoints ready, transition to no endpoints ready
"transition all endpoints to unready state": {
@ -1692,6 +1723,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: true, Ready: false, Serving: false, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with no endpoints ready, transition to all endpoints ready
"transition all endpoints to ready state": {
@ -1713,6 +1745,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with some endpoints ready, transition to more endpoints ready
"transition some endpoints to ready state": {
@ -1741,6 +1774,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with some endpoints ready, transition to some terminating
"transition some endpoints to terminating state": {
@ -1769,6 +1803,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: true},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
}
@ -1783,6 +1818,12 @@ func TestEndpointSliceUpdate(t *testing.T) {
if tc.endpointChangeTracker.items == nil {
t.Errorf("Expected ect.items to not be nil")
}
pendingChanges := tc.endpointChangeTracker.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.List(), pendingChanges.List())
}
changes := tc.endpointChangeTracker.checkoutChanges()
if tc.expectedCurrentChange == nil {
if len(changes) != 0 {

View File

@ -188,6 +188,21 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
return changed
}
// pendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time checkoutChanges was called
func (cache *EndpointSliceCache) pendingChanges() sets.String {
cache.lock.Lock()
defer cache.lock.Unlock()
changes := sets.NewString()
for serviceNN, esTracker := range cache.trackerByServiceMap {
if len(esTracker.pending) > 0 {
changes.Insert(serviceNN.String())
}
}
return changes
}
// checkoutChanges returns a list of all endpointsChanges that are
// pending and then marks them as applied.
func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {

View File

@ -38,9 +38,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
@ -163,6 +165,7 @@ type Proxier struct {
// updating iptables with some partial data after kube-proxy restart.
endpointSlicesSynced bool
servicesSynced bool
needFullSync bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
syncPeriod time.Duration
@ -298,7 +301,7 @@ func NewProxier(ipt utiliptables.Interface,
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
proxier.syncProxyRules, syncPeriod, wait.NeverStop)
proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)
if ipt.HasRandomFully() {
klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
@ -539,7 +542,7 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
proxier.forceSyncProxyRules()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
@ -575,7 +578,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
proxier.forceSyncProxyRules()
}
// OnNodeAdd is called whenever creation of new node object
@ -596,6 +599,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.needFullSync = true
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
@ -620,6 +624,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.needFullSync = true
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
@ -636,6 +641,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.Sync()
@ -769,6 +775,17 @@ func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string
return append(args, "-m", "comment", "--comment", svcName)
}
// Called by the iptables.Monitor, and in response to topology changes; this calls
// syncProxyRules() and tells it to resync all services, regardless of whether the
// Service or Endpoints/EndpointSlice objects themselves have changed
func (proxier *Proxier) forceSyncProxyRules() {
proxier.mu.Lock()
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.syncProxyRules()
}
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
@ -789,9 +806,12 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
}()
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
tryPartialSync := !proxier.needFullSync && utilfeature.DefaultFeatureGate.Enabled(features.MinimizeIPTablesRestore)
var serviceChanged, endpointsChanged sets.String
if tryPartialSync {
serviceChanged = proxier.serviceChanges.PendingChanges()
endpointsChanged = proxier.endpointsChanges.PendingChanges()
}
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
@ -826,6 +846,13 @@ func (proxier *Proxier) syncProxyRules() {
if !success {
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
if tryPartialSync {
metrics.IptablesPartialRestoreFailuresTotal.Inc()
}
// proxier.serviceChanges and proxier.endpointChanges have already
// been flushed, so we've lost the state needed to be able to do
// a partial sync.
proxier.needFullSync = true
}
}()
@ -1184,6 +1211,13 @@ func (proxier *Proxier) syncProxyRules() {
)
}
// If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync
// then we can omit them from the restore input. (We have already marked
// them in activeNATChains, so they won't get deleted.)
if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) {
continue
}
// Set up internal traffic handling.
if hasInternalEndpoints {
args = append(args[:0],
@ -1479,6 +1513,7 @@ func (proxier *Proxier) syncProxyRules() {
return
}
success = true
proxier.needFullSync = false
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {

View File

@ -1343,8 +1343,10 @@ func getLine() int {
}
// assertIPTablesRulesEqual asserts that the generated rules in result match the rules in
// expected, ignoring irrelevant ordering differences.
func assertIPTablesRulesEqual(t *testing.T, line int, expected, result string) {
// expected, ignoring irrelevant ordering differences. By default this also checks the
// rules for consistency (eg, no jumps to chains that aren't defined), but that can be
// disabled by passing false for checkConsistency if you are passing a partial set of rules.
func assertIPTablesRulesEqual(t *testing.T, line int, checkConsistency bool, expected, result string) {
expected = strings.TrimLeft(expected, " \t\n")
result, err := sortIPTablesRules(strings.TrimLeft(result, " \t\n"))
@ -1360,9 +1362,11 @@ func assertIPTablesRulesEqual(t *testing.T, line int, expected, result string) {
t.Errorf("rules do not match%s:\ndiff:\n%s\nfull result:\n```\n%s```", lineStr, diff, result)
}
err = checkIPTablesRuleJumps(expected)
if err != nil {
t.Fatalf("%s", err)
if checkConsistency {
err = checkIPTablesRuleJumps(expected)
if err != nil {
t.Fatalf("%s%s", err, lineStr)
}
}
}
@ -2037,7 +2041,7 @@ func TestOverallIPTablesRulesWithMultipleServices(t *testing.T) {
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
natRulesMetric, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)))
if err != nil {
@ -2099,7 +2103,7 @@ func TestClusterIPReject(t *testing.T) {
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -2182,7 +2186,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.0.1:80" -j KUBE-SEP-SXIVWICOYRO3J4NJ
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -2303,7 +2307,7 @@ func TestLoadBalancer(t *testing.T) {
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -2491,7 +2495,7 @@ func TestNodePort(t *testing.T) {
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.0.1:80" -j KUBE-SEP-SXIVWICOYRO3J4NJ
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -2586,7 +2590,7 @@ func TestHealthCheckNodePort(t *testing.T) {
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -2648,7 +2652,7 @@ func TestMasqueradeRule(t *testing.T) {
} else {
expected = fmt.Sprintf(expectedFmt, "")
}
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
}
}
@ -2704,7 +2708,7 @@ func TestExternalIPsReject(t *testing.T) {
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -2813,7 +2817,7 @@ func TestOnlyLocalExternalIPs(t *testing.T) {
-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.2.1:80" -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -2921,7 +2925,7 @@ func TestNonLocalExternalIPs(t *testing.T) {
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.2.1:80" -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -2994,7 +2998,7 @@ func TestNodePortReject(t *testing.T) {
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -3086,7 +3090,7 @@ func TestLoadBalancerReject(t *testing.T) {
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -3219,7 +3223,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.2.1:80" -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
runPacketFlowTests(t, getLine(), ipt, testNodeIP, []packetFlowTest{
{
@ -3395,7 +3399,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
fp.syncProxyRules()
assertIPTablesRulesEqual(t, line, expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, line, true, expected, fp.iptablesData.String())
runPacketFlowTests(t, line, ipt, testNodeIP, []packetFlowTest{
{
@ -4742,7 +4746,7 @@ func TestEndpointSliceE2E(t *testing.T) {
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
assertIPTablesRulesEqual(t, getLine(), expectedIPTablesWithSlice, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expectedIPTablesWithSlice, fp.iptablesData.String())
fp.OnEndpointSliceDelete(endpointSlice)
fp.syncProxyRules()
@ -5345,7 +5349,7 @@ func TestInternalTrafficPolicyE2E(t *testing.T) {
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
assertIPTablesRulesEqual(t, tc.line, tc.expectedIPTablesWithSlice, fp.iptablesData.String())
assertIPTablesRulesEqual(t, tc.line, true, tc.expectedIPTablesWithSlice, fp.iptablesData.String())
runPacketFlowTests(t, tc.line, ipt, testNodeIP, tc.flowTests)
fp.OnEndpointSliceDelete(endpointSlice)
@ -6122,14 +6126,14 @@ func TestEndpointSliceWithTerminatingEndpointsTrafficPolicyLocal(t *testing.T) {
fp.OnEndpointSliceAdd(testcase.endpointslice)
fp.syncProxyRules()
assertIPTablesRulesEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String())
assertIPTablesRulesEqual(t, testcase.line, true, testcase.expectedIPTables, fp.iptablesData.String())
runPacketFlowTests(t, testcase.line, ipt, testNodeIP, testcase.flowTests)
fp.OnEndpointSliceDelete(testcase.endpointslice)
fp.syncProxyRules()
if testcase.noUsableEndpoints {
// Deleting the EndpointSlice should have had no effect
assertIPTablesRulesEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String())
assertIPTablesRulesEqual(t, testcase.line, true, testcase.expectedIPTables, fp.iptablesData.String())
} else {
assertIPTablesRulesNotEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String())
}
@ -6868,14 +6872,14 @@ func TestEndpointSliceWithTerminatingEndpointsTrafficPolicyCluster(t *testing.T)
fp.OnEndpointSliceAdd(testcase.endpointslice)
fp.syncProxyRules()
assertIPTablesRulesEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String())
assertIPTablesRulesEqual(t, testcase.line, true, testcase.expectedIPTables, fp.iptablesData.String())
runPacketFlowTests(t, testcase.line, ipt, testNodeIP, testcase.flowTests)
fp.OnEndpointSliceDelete(testcase.endpointslice)
fp.syncProxyRules()
if testcase.noUsableEndpoints {
// Deleting the EndpointSlice should have had no effect
assertIPTablesRulesEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String())
assertIPTablesRulesEqual(t, testcase.line, true, testcase.expectedIPTables, fp.iptablesData.String())
} else {
assertIPTablesRulesNotEqual(t, testcase.line, testcase.expectedIPTables, fp.iptablesData.String())
}
@ -7487,6 +7491,8 @@ func countEndpointsAndComments(iptablesData string, matchEndpoint string) (strin
}
func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MinimizeIPTablesRestore, true)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.masqueradeAll = true
@ -7578,6 +7584,22 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
}}
}))
fp.syncProxyRules()
firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "203.0.113.4")
assert.Equal(t, "-A KUBE-SEP-RUVVH7YV3PHQBDOS -m tcp -p tcp -j DNAT --to-destination 203.0.113.4:8081", firstEndpoint)
// syncProxyRules will only have output the endpoints for svc3, since the others
// didn't change (and syncProxyRules doesn't automatically do a full resync when you
// cross the largeClusterEndpointsThreshold).
if numEndpoints != 3 {
t.Errorf("Found wrong number of endpoints on partial resync: expected %d, got %d", 3, numEndpoints)
}
if numComments != 0 {
t.Errorf("numComments (%d) != 0 after partial resync when numEndpoints (%d) > threshold (%d)", numComments, expectedEndpoints+3, largeClusterEndpointsThreshold)
}
// Now force a full resync and confirm that it rewrites the older services with
// no comments as well.
fp.forceSyncProxyRules()
expectedEndpoints += 3
firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0")
@ -7614,12 +7636,12 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
}}
}))
fp.syncProxyRules()
expectedEndpoints += 1
svc4Endpoint, numEndpoints, _ := countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "-A KUBE-SEP-SU5STNODRYEWJAUF -m tcp -p tcp -j DNAT --to-destination 10.4.0.1:8082", svc4Endpoint, "svc4 endpoint was not created")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", expectedEndpoints, numEndpoints)
// should only sync svc4
if numEndpoints != 1 {
t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", 1, numEndpoints)
}
// In large-cluster mode, if we delete a service, it will not re-sync its chains
@ -7627,12 +7649,12 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
fp.lastIPTablesCleanup = time.Now()
fp.OnServiceDelete(svc4)
fp.syncProxyRules()
expectedEndpoints -= 1
svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", expectedEndpoints, numEndpoints)
// should only sync svc4, and shouldn't output its endpoints
if numEndpoints != 0 {
t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", 0, numEndpoints)
}
assert.NotContains(t, fp.iptablesData.String(), "-X ", "iptables data unexpectedly contains chain deletions")
@ -7642,17 +7664,27 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", expectedEndpoints, numEndpoints)
if numEndpoints != 0 {
t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", 0, numEndpoints)
}
assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SVC-EBDQOQU5SJFXRIL3", "iptables data does not contain chain deletion")
assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SEP-SU5STNODRYEWJAUF", "iptables data does not contain endpoint deletions")
// force a full sync and count
fp.forceSyncProxyRules()
_, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints)
}
}
// Test calling syncProxyRules() multiple times with various changes
func TestSyncProxyRulesRepeated(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MinimizeIPTablesRestore, true)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
metrics.RegisterMetrics()
// Create initial state
var svc2 *v1.Service
@ -7744,9 +7776,10 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
// Add a new service and its endpoints
// Add a new service and its endpoints. (This will only sync the SVC and SEP rules
// for the new service, not the existing ones.)
makeServiceMap(fp,
makeTestService("ns3", "svc3", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@ -7792,11 +7825,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SEP-UHEGFW77JX3KXTOV - [0:0]
:KUBE-SVC-2VJB64SDSIJUP5T6 - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns2/svc2:p8080 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 8080 -j KUBE-SVC-2VJB64SDSIJUP5T6
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
@ -7807,21 +7836,13 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SEP-UHEGFW77JX3KXTOV -m comment --comment ns2/svc2:p8080 -s 10.0.2.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-UHEGFW77JX3KXTOV -m comment --comment ns2/svc2:p8080 -m tcp -p tcp -j DNAT --to-destination 10.0.2.1:8080
-A KUBE-SVC-2VJB64SDSIJUP5T6 -m comment --comment "ns2/svc2:p8080 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 8080 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-2VJB64SDSIJUP5T6 -m comment --comment "ns2/svc2:p8080 -> 10.0.2.1:8080" -j KUBE-SEP-UHEGFW77JX3KXTOV
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Delete a service
// Delete a service. (Won't update the other services.)
fp.OnServiceDelete(svc2)
fp.syncProxyRules()
@ -7841,12 +7862,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SEP-UHEGFW77JX3KXTOV - [0:0]
:KUBE-SVC-2VJB64SDSIJUP5T6 - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
@ -7854,23 +7871,18 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
-X KUBE-SEP-UHEGFW77JX3KXTOV
-X KUBE-SVC-2VJB64SDSIJUP5T6
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Add a service, sync, then add its endpoints
// Add a service, sync, then add its endpoints. (The first sync will be a no-op other
// than adding the REJECT rule. The second sync will create the new service.)
var svc4 *v1.Service
makeServiceMap(fp,
makeTestService("ns4", "svc4", func(svc *v1.Service) {
svc4 = svc
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.30.0.44"
svc.Spec.Ports = []v1.ServicePort{{
@ -7898,10 +7910,6 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
@ -7909,17 +7917,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
populateEndpointSlices(fp,
makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) {
@ -7952,11 +7952,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-4SW47YFZTEDKD3PK - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
@ -7967,21 +7963,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -s 10.0.3.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-BSWRHOQ77KEXZLNL -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.1:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.1:80" -j KUBE-SEP-BSWRHOQ77KEXZLNL
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Change an endpoint of an existing service
// Change an endpoint of an existing service. This will cause its SVC and SEP
// chains to be rewritten.
eps3update := eps3.DeepCopy()
eps3update.Endpoints[0].Addresses[0] = "10.0.3.2"
fp.OnEndpointSliceUpdate(eps3, eps3update)
@ -8003,13 +7992,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0]
:KUBE-SEP-BSWRHOQ77KEXZLNL - [0:0]
:KUBE-SEP-DKCFIS26GWF2WLWC - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-4SW47YFZTEDKD3PK - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
@ -8018,24 +8003,16 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -j KUBE-SEP-DKCFIS26GWF2WLWC
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
-X KUBE-SEP-BSWRHOQ77KEXZLNL
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Add an endpoint to a service
// Add an endpoint to a service. This will cause its SVC and SEP chains to be rewritten.
eps3update2 := eps3update.DeepCopy()
eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{"10.0.3.3"}})
fp.OnEndpointSliceUpdate(eps3update, eps3update2)
@ -8057,13 +8034,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0]
:KUBE-SEP-DKCFIS26GWF2WLWC - [0:0]
:KUBE-SEP-JVVZVJ7BSEPPRNBS - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-4SW47YFZTEDKD3PK - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
@ -8072,26 +8045,95 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Sync with no new changes...
// Sync with no new changes... This will not rewrite any SVC or SEP chains
fp.syncProxyRules()
expected = dedent.Dedent(`
*filter
:KUBE-NODEPORTS - [0:0]
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-NODEPORTS - [0:0]
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Now force a partial resync error and ensure that it recovers correctly
if fp.needFullSync {
t.Fatalf("Proxier unexpectedly already needs a full sync?")
}
prFailures, err := testutil.GetCounterMetricValue(metrics.IptablesPartialRestoreFailuresTotal)
if err != nil {
t.Fatalf("Could not get partial restore failures metric: %v", err)
}
if prFailures != 0.0 {
t.Errorf("Already did a partial resync? Something failed earlier!")
}
// Add a rule jumping from svc3's service chain to svc4's endpoint, then try to
// delete svc4. This will fail because the partial resync won't rewrite svc3's
// rules and so the partial restore would leave a dangling jump from there to
// svc4's endpoint. The proxier will then queue a full resync in response to the
// partial resync failure, and the full resync will succeed (since it will rewrite
// svc3's rules as well).
//
// This is an absurd scenario, but it has to be; partial resync failures are
// supposed to be impossible; if we knew of any non-absurd scenario that would
// cause such a failure, then that would be a bug and we would fix it.
if _, err := fp.iptables.ChainExists(utiliptables.TableNAT, utiliptables.Chain("KUBE-SEP-AYCN5HPXMIRJNJXU")); err != nil {
t.Fatalf("svc4's endpoint chain unexpected already does not exist!")
}
if _, err := fp.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.Chain("KUBE-SVC-X27LE4BHSL4DOUIK"), "-j", "KUBE-SEP-AYCN5HPXMIRJNJXU"); err != nil {
t.Fatalf("Could not add bad iptables rule: %v", err)
}
fp.OnServiceDelete(svc4)
fp.syncProxyRules()
if _, err := fp.iptables.ChainExists(utiliptables.TableNAT, utiliptables.Chain("KUBE-SEP-AYCN5HPXMIRJNJXU")); err != nil {
t.Errorf("svc4's endpoint chain was successfully deleted despite dangling references!")
}
if !fp.needFullSync {
t.Errorf("Proxier did not fail on previous partial resync?")
}
updatedPRFailures, err := testutil.GetCounterMetricValue(metrics.IptablesPartialRestoreFailuresTotal)
if err != nil {
t.Errorf("Could not get partial restore failures metric: %v", err)
}
if updatedPRFailures != prFailures+1.0 {
t.Errorf("Partial restore failures metric was not incremented after failed partial resync (expected %.02f, got %.02f)", prFailures+1.0, updatedPRFailures)
}
// On retry we should do a full resync, which should succeed (and delete svc4)
fp.syncProxyRules()
expected = dedent.Dedent(`
@ -8119,30 +8161,27 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j KUBE-SVC-4SW47YFZTEDKD3PK
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -s 10.0.4.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-AYCN5HPXMIRJNJXU -m comment --comment ns4/svc4:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.4.1:80
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 cluster IP" -m tcp -p tcp -d 172.30.0.44 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-4SW47YFZTEDKD3PK -m comment --comment "ns4/svc4:p80 -> 10.0.4.1:80" -j KUBE-SEP-AYCN5HPXMIRJNJXU
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
-X KUBE-SEP-AYCN5HPXMIRJNJXU
-X KUBE-SVC-4SW47YFZTEDKD3PK
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), expected, fp.iptablesData.String())
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
}
func TestNoEndpointsMetric(t *testing.T) {

View File

@ -126,6 +126,17 @@ var (
},
)
// IptablesPartialRestoreFailuresTotal is the number of iptables *partial* restore
// failures (resulting in a fall back to a full restore) that the proxy has seen.
IptablesPartialRestoreFailuresTotal = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: kubeProxySubsystem,
Name: "sync_proxy_rules_iptables_partial_restore_failures_total",
Help: "Cumulative proxy iptables partial restore failures",
StabilityLevel: metrics.ALPHA,
},
)
// IptablesRulesTotal is the number of iptables rules that the iptables proxy installs.
IptablesRulesTotal = metrics.NewGaugeVec(
&metrics.GaugeOpts{
@ -177,6 +188,7 @@ func RegisterMetrics() {
legacyregistry.MustRegister(ServiceChangesTotal)
legacyregistry.MustRegister(IptablesRulesTotal)
legacyregistry.MustRegister(IptablesRestoreFailuresTotal)
legacyregistry.MustRegister(IptablesPartialRestoreFailuresTotal)
legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp)
legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal)
})

View File

@ -328,6 +328,20 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
return len(sct.items) > 0
}
// PendingChanges returns a set whose keys are the names of the services that have changed
// since the last time sct was used to update a ServiceMap. (You must call this _before_
// calling sm.Update(sct).)
func (sct *ServiceChangeTracker) PendingChanges() sets.String {
sct.lock.Lock()
defer sct.lock.Unlock()
changes := sets.NewString()
for name := range sct.items {
changes.Insert(name.String())
}
return changes
}
// UpdateServiceMapResult is the updated results after applying service changes.
type UpdateServiceMapResult struct {
// HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node.

View File

@ -564,6 +564,10 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
)
// Headless service should be ignored
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
@ -591,6 +595,10 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
}),
)
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
@ -651,6 +659,18 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
for i := range services {
fp.addService(services[i])
}
pending := fp.serviceChanges.PendingChanges()
for i := range services {
name := services[i].Namespace + "/" + services[i].Name
if !pending.Has(name) {
t.Errorf("expected pending change for %q", name)
}
}
if pending.Len() != len(services) {
t.Errorf("expected %d pending service changes, got %d", len(services), pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 8 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
@ -684,6 +704,10 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
fp.deleteService(services[2])
fp.deleteService(services[3])
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 4 {
t.Errorf("expected 4 pending service changes, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
@ -733,6 +757,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
fp.addService(servicev1)
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
@ -747,6 +775,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// Change service to load-balancer
fp.updateService(servicev1, servicev2)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
@ -761,6 +793,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// No change; make sure the service map stays the same and there are
// no health-check changes
fp.updateService(servicev2, servicev2)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
@ -774,6 +810,10 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// And back to ClusterIP
fp.updateService(servicev2, servicev1)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)

View File

@ -22,6 +22,7 @@ import (
"strings"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/iptables"
)
@ -217,16 +218,22 @@ func (f *FakeIPTables) SaveInto(table iptables.Table, buffer *bytes.Buffer) erro
return f.saveTable(table, buffer)
}
func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error {
// This is not a complete list but it's enough to pass the unit tests
var builtinTargets = sets.NewString("ACCEPT", "DROP", "RETURN", "REJECT", "DNAT", "SNAT", "MASQUERADE", "MARK")
func (f *FakeIPTables) restoreTable(newDump *IPTablesDump, newTable *Table, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error {
oldTable, err := f.Dump.GetTable(newTable.Name)
if err != nil {
return err
}
backupChains := make([]Chain, len(oldTable.Chains))
copy(backupChains, oldTable.Chains)
// Update internal state
if flush == iptables.FlushTables {
oldTable.Chains = make([]Chain, 0, len(newTable.Chains))
}
for _, newChain := range newTable.Chains {
oldChain, _ := f.Dump.GetChain(newTable.Name, newChain.Name)
switch {
@ -235,7 +242,6 @@ func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, c
case oldChain == nil && !newChain.Deleted:
oldTable.Chains = append(oldTable.Chains, newChain)
case oldChain != nil && newChain.Deleted:
// FIXME: should make sure chain is not referenced from other jumps
_ = f.DeleteChain(newTable.Name, newChain.Name)
case oldChain != nil && !newChain.Deleted:
// replace old data with new
@ -246,6 +252,35 @@ func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, c
}
}
}
// Now check that all old/new jumps are valid
for _, chain := range oldTable.Chains {
for _, rule := range chain.Rules {
if rule.Jump == nil {
continue
}
if builtinTargets.Has(rule.Jump.Value) {
continue
}
jumpedChain, _ := f.Dump.GetChain(oldTable.Name, iptables.Chain(rule.Jump.Value))
if jumpedChain == nil {
newChain, _ := newDump.GetChain(oldTable.Name, iptables.Chain(rule.Jump.Value))
if newChain != nil {
// rule is an old rule that jumped to a chain which
// was deleted by newDump.
oldTable.Chains = backupChains
return fmt.Errorf("deleted chain %q is referenced by existing rules", newChain.Name)
} else {
// rule is a new rule that jumped to a chain that was
// neither created nor pre-existing
oldTable.Chains = backupChains
return fmt.Errorf("rule %q jumps to a non-existent chain", rule.Raw)
}
}
}
}
return nil
}
@ -261,7 +296,7 @@ func (f *FakeIPTables) Restore(table iptables.Table, data []byte, flush iptables
return err
}
return f.restoreTable(newTable, flush, counters)
return f.restoreTable(dump, newTable, flush, counters)
}
// RestoreAll is part of iptables.Interface
@ -272,7 +307,7 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter
}
for i := range dump.Tables {
err = f.restoreTable(&dump.Tables[i], flush, counters)
err = f.restoreTable(dump, &dump.Tables[i], flush, counters)
if err != nil {
return err
}

View File

@ -170,10 +170,12 @@ func TestFakeIPTables(t *testing.T) {
*nat
:KUBE-RESTORED - [0:0]
:KUBE-MISC-CHAIN - [0:0]
:KUBE-MISC-TWO - [0:0]
:KUBE-EMPTY - [0:0]
-A KUBE-RESTORED -m comment --comment "restored chain" -j ACCEPT
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO
-A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE
-A KUBE-MISC-TWO -j ACCEPT
COMMIT
`, "\n"))
err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.NoRestoreCounters)
@ -196,11 +198,13 @@ func TestFakeIPTables(t *testing.T) {
:KUBE-TEST - [0:0]
:KUBE-RESTORED - [0:0]
:KUBE-MISC-CHAIN - [0:0]
:KUBE-MISC-TWO - [0:0]
:KUBE-EMPTY - [0:0]
-A KUBE-TEST -j ACCEPT
-A KUBE-RESTORED -m comment --comment "restored chain" -j ACCEPT
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO
-A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE
-A KUBE-MISC-TWO -j ACCEPT
COMMIT
*filter
:INPUT - [0:0]
@ -214,6 +218,30 @@ func TestFakeIPTables(t *testing.T) {
t.Fatalf("bad post-restore dump. expected:\n%s\n\ngot:\n%s\n", expected, buf.Bytes())
}
// Trying to use Restore to delete a chain that another chain jumps to will fail
rules = dedent.Dedent(strings.Trim(`
*nat
:KUBE-MISC-TWO - [0:0]
-X KUBE-MISC-TWO
COMMIT
`, "\n"))
err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.RestoreCounters)
if err == nil || !strings.Contains(err.Error(), "referenced by existing rules") {
t.Fatalf("Expected 'referenced by existing rules' error from Restore, got %v", err)
}
// Trying to use Restore to add a jump to a non-existent chain will fail
rules = dedent.Dedent(strings.Trim(`
*nat
:KUBE-MISC-TWO - [0:0]
-A KUBE-MISC-TWO -j KUBE-MISC-THREE
COMMIT
`, "\n"))
err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.RestoreCounters)
if err == nil || !strings.Contains(err.Error(), "non-existent chain") {
t.Fatalf("Expected 'non-existent chain' error from Restore, got %v", err)
}
// more Restore; empty out one chain and delete another, but also update its counters
rules = dedent.Dedent(strings.Trim(`
*nat
@ -240,9 +268,11 @@ func TestFakeIPTables(t *testing.T) {
:POSTROUTING - [0:0]
:KUBE-TEST - [99:9999]
:KUBE-MISC-CHAIN - [0:0]
:KUBE-MISC-TWO - [0:0]
:KUBE-EMPTY - [0:0]
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO
-A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE
-A KUBE-MISC-TWO -j ACCEPT
COMMIT
*filter
:INPUT - [0:0]