diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index da61d131248..5649a88b936 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -159,6 +159,7 @@ type Proxier struct { // updating iptables with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool + lastFullSync time.Time needFullSync bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules @@ -325,7 +326,7 @@ func NewProxier(ctx context.Context, // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to. // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though. // time.Hour is arbitrary. - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod, burstSyncs) go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop) @@ -541,6 +542,7 @@ func (proxier *Proxier) SyncLoop() { // synthesize "last change queued" time as the informers are syncing. metrics.SyncProxyRulesLastQueuedTimestamp.WithLabelValues(string(proxier.ipFamily)).SetToCurrentTime() proxier.syncRunner.Loop(wait.NeverStop) + } func (proxier *Proxier) setInitialized(value bool) { @@ -806,15 +808,14 @@ func (proxier *Proxier) syncProxyRules() { return } - // The value of proxier.needFullSync may change before the defer funcs run, so - // we need to keep track of whether it was set at the *start* of the sync. - tryPartialSync := !proxier.needFullSync - // Keep track of how long syncs take. start := time.Now() + + doFullSync := proxier.needFullSync || (time.Since(proxier.lastFullSync) > proxyutil.FullSyncPeriod) + defer func() { metrics.SyncProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start)) - if tryPartialSync { + if !doFullSync { metrics.SyncPartialProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start)) } else { metrics.SyncFullProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start)) @@ -825,24 +826,26 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - proxier.logger.V(2).Info("Syncing iptables rules") + proxier.logger.V(2).Info("Syncing iptables rules", "fullSync", doFullSync) success := false defer func() { if !success { proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod) proxier.syncRunner.RetryAfter(proxier.syncPeriod) - if tryPartialSync { + if !doFullSync { metrics.IPTablesPartialRestoreFailuresTotal.WithLabelValues(string(proxier.ipFamily)).Inc() } // proxier.serviceChanges and proxier.endpointChanges have already // been flushed, so we've lost the state needed to be able to do // a partial sync. proxier.needFullSync = true + } else if doFullSync { + proxier.lastFullSync = time.Now() } }() - if !tryPartialSync { + if doFullSync { // Ensure that our jump rules (eg from PREROUTING to KUBE-SERVICES) exist. // We can't do this as part of the iptables-restore because we don't want // to specify/replace *all* of the rules in PREROUTING, etc. @@ -1235,7 +1238,7 @@ func (proxier *Proxier) syncProxyRules() { // then we can omit them from the restore input. However, we have to still // figure out how many chains we _would_ have written, to make the metrics // come out right, so we just compute them and throw them away. - if tryPartialSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) { + if !doFullSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) { natChains = skippedNatChains natRules = skippedNatRules } diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index db993dab5ad..16a78dba78f 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -163,6 +163,7 @@ type Proxier struct { // updating nftables with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool + lastFullSync time.Time needFullSync bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules @@ -281,7 +282,7 @@ func NewProxier(ctx context.Context, burstSyncs := 2 logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) // We need to pass *some* maxInterval to NewBoundedFrequencyRunner. time.Hour is arbitrary. - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod, burstSyncs) return proxier, nil } @@ -1167,19 +1168,18 @@ func (proxier *Proxier) syncProxyRules() { return } + // Keep track of how long syncs take. + start := time.Now() + // // Below this point we will not return until we try to write the nftables rules. // - // The value of proxier.needFullSync may change before the defer funcs run, so - // we need to keep track of whether it was set at the *start* of the sync. - tryPartialSync := !proxier.needFullSync + doFullSync := proxier.needFullSync || (time.Since(proxier.lastFullSync) > proxyutil.FullSyncPeriod) - // Keep track of how long syncs take. - start := time.Now() defer func() { metrics.SyncProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start)) - if tryPartialSync { + if !doFullSync { metrics.SyncPartialProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start)) } else { metrics.SyncFullProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start)) @@ -1190,7 +1190,7 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - proxier.logger.V(2).Info("Syncing nftables rules") + proxier.logger.V(2).Info("Syncing nftables rules", "fullSync", doFullSync) success := false defer func() { @@ -1201,6 +1201,8 @@ func (proxier *Proxier) syncProxyRules() { // been flushed, so we've lost the state needed to be able to do // a partial sync. proxier.needFullSync = true + } else if doFullSync { + proxier.lastFullSync = time.Now() } }() @@ -1228,7 +1230,7 @@ func (proxier *Proxier) syncProxyRules() { // (with a later timestamp) at the end of the sync. proxier.logger.Error(err, "Unable to delete stale chains; will retry later") metrics.NFTablesCleanupFailuresTotal.WithLabelValues(string(proxier.ipFamily)).Inc() - tryPartialSync = false + doFullSync = true // Log failed transaction and list full kube-proxy table. proxier.logFailure(tx) @@ -1238,7 +1240,7 @@ func (proxier *Proxier) syncProxyRules() { // Now start the actual syncing transaction tx := proxier.nftables.NewTransaction() - if !tryPartialSync { + if doFullSync { proxier.setupNFTables(tx) } @@ -1286,7 +1288,7 @@ func (proxier *Proxier) syncProxyRules() { // skipServiceUpdate is used for all service-related chains and their elements. // If no changes were done to the service or its endpoints, these objects may be skipped. - skipServiceUpdate := tryPartialSync && + skipServiceUpdate := !doFullSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index d9b13f9a18e..4aa642b2ebb 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "strings" + "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -37,6 +38,9 @@ const ( // IPv6ZeroCIDR is the CIDR block for the whole IPv6 address space IPv6ZeroCIDR = "::/0" + + // FullSyncPeriod is iptables and nftables proxier full sync period + FullSyncPeriod = 1 * time.Hour ) // IsZeroCIDR checks whether the input CIDR string is either