full sync per one hour with BFR

This commit is contained in:
olderTaoist 2025-01-08 20:00:18 +08:00 committed by mengxiangyong
parent f1b3fdf7e6
commit 561c1d235a
3 changed files with 30 additions and 21 deletions

View File

@ -159,6 +159,7 @@ type Proxier struct {
// updating iptables with some partial data after kube-proxy restart.
endpointSlicesSynced bool
servicesSynced bool
lastFullSync time.Time
needFullSync bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
@ -325,7 +326,7 @@ func NewProxier(ctx context.Context,
// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
// time.Hour is arbitrary.
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod, burstSyncs)
go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)
@ -541,6 +542,7 @@ func (proxier *Proxier) SyncLoop() {
// synthesize "last change queued" time as the informers are syncing.
metrics.SyncProxyRulesLastQueuedTimestamp.WithLabelValues(string(proxier.ipFamily)).SetToCurrentTime()
proxier.syncRunner.Loop(wait.NeverStop)
}
func (proxier *Proxier) setInitialized(value bool) {
@ -806,15 +808,14 @@ func (proxier *Proxier) syncProxyRules() {
return
}
// The value of proxier.needFullSync may change before the defer funcs run, so
// we need to keep track of whether it was set at the *start* of the sync.
tryPartialSync := !proxier.needFullSync
// Keep track of how long syncs take.
start := time.Now()
doFullSync := proxier.needFullSync || (time.Since(proxier.lastFullSync) > proxyutil.FullSyncPeriod)
defer func() {
metrics.SyncProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
if tryPartialSync {
if !doFullSync {
metrics.SyncPartialProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
} else {
metrics.SyncFullProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
@ -825,24 +826,26 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
proxier.logger.V(2).Info("Syncing iptables rules")
proxier.logger.V(2).Info("Syncing iptables rules", "fullSync", doFullSync)
success := false
defer func() {
if !success {
proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
if tryPartialSync {
if !doFullSync {
metrics.IPTablesPartialRestoreFailuresTotal.WithLabelValues(string(proxier.ipFamily)).Inc()
}
// proxier.serviceChanges and proxier.endpointChanges have already
// been flushed, so we've lost the state needed to be able to do
// a partial sync.
proxier.needFullSync = true
} else if doFullSync {
proxier.lastFullSync = time.Now()
}
}()
if !tryPartialSync {
if doFullSync {
// Ensure that our jump rules (eg from PREROUTING to KUBE-SERVICES) exist.
// We can't do this as part of the iptables-restore because we don't want
// to specify/replace *all* of the rules in PREROUTING, etc.
@ -1235,7 +1238,7 @@ func (proxier *Proxier) syncProxyRules() {
// then we can omit them from the restore input. However, we have to still
// figure out how many chains we _would_ have written, to make the metrics
// come out right, so we just compute them and throw them away.
if tryPartialSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) {
if !doFullSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) {
natChains = skippedNatChains
natRules = skippedNatRules
}

View File

@ -163,6 +163,7 @@ type Proxier struct {
// updating nftables with some partial data after kube-proxy restart.
endpointSlicesSynced bool
servicesSynced bool
lastFullSync time.Time
needFullSync bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
@ -281,7 +282,7 @@ func NewProxier(ctx context.Context,
burstSyncs := 2
logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner. time.Hour is arbitrary.
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod, burstSyncs)
return proxier, nil
}
@ -1167,19 +1168,18 @@ func (proxier *Proxier) syncProxyRules() {
return
}
// Keep track of how long syncs take.
start := time.Now()
//
// Below this point we will not return until we try to write the nftables rules.
//
// The value of proxier.needFullSync may change before the defer funcs run, so
// we need to keep track of whether it was set at the *start* of the sync.
tryPartialSync := !proxier.needFullSync
doFullSync := proxier.needFullSync || (time.Since(proxier.lastFullSync) > proxyutil.FullSyncPeriod)
// Keep track of how long syncs take.
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
if tryPartialSync {
if !doFullSync {
metrics.SyncPartialProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
} else {
metrics.SyncFullProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
@ -1190,7 +1190,7 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
proxier.logger.V(2).Info("Syncing nftables rules")
proxier.logger.V(2).Info("Syncing nftables rules", "fullSync", doFullSync)
success := false
defer func() {
@ -1201,6 +1201,8 @@ func (proxier *Proxier) syncProxyRules() {
// been flushed, so we've lost the state needed to be able to do
// a partial sync.
proxier.needFullSync = true
} else if doFullSync {
proxier.lastFullSync = time.Now()
}
}()
@ -1228,7 +1230,7 @@ func (proxier *Proxier) syncProxyRules() {
// (with a later timestamp) at the end of the sync.
proxier.logger.Error(err, "Unable to delete stale chains; will retry later")
metrics.NFTablesCleanupFailuresTotal.WithLabelValues(string(proxier.ipFamily)).Inc()
tryPartialSync = false
doFullSync = true
// Log failed transaction and list full kube-proxy table.
proxier.logFailure(tx)
@ -1238,7 +1240,7 @@ func (proxier *Proxier) syncProxyRules() {
// Now start the actual syncing transaction
tx := proxier.nftables.NewTransaction()
if !tryPartialSync {
if doFullSync {
proxier.setupNFTables(tx)
}
@ -1286,7 +1288,7 @@ func (proxier *Proxier) syncProxyRules() {
// skipServiceUpdate is used for all service-related chains and their elements.
// If no changes were done to the service or its endpoints, these objects may be skipped.
skipServiceUpdate := tryPartialSync &&
skipServiceUpdate := !doFullSync &&
!serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) &&
!endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName)

View File

@ -20,6 +20,7 @@ import (
"fmt"
"net"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
@ -37,6 +38,9 @@ const (
// IPv6ZeroCIDR is the CIDR block for the whole IPv6 address space
IPv6ZeroCIDR = "::/0"
// FullSyncPeriod is iptables and nftables proxier full sync period
FullSyncPeriod = 1 * time.Hour
)
// IsZeroCIDR checks whether the input CIDR string is either