diff --git a/cluster/gce/container-linux/configure-helper.sh b/cluster/gce/container-linux/configure-helper.sh index 1e98e2a764c..9e4409fade2 100755 --- a/cluster/gce/container-linux/configure-helper.sh +++ b/cluster/gce/container-linux/configure-helper.sh @@ -615,6 +615,7 @@ function start-kube-proxy { if [[ -n "${FEATURE_GATES:-}" ]]; then params+=" --feature-gates=${FEATURE_GATES}" fi + params+=" --iptables-sync-period=1m --iptables-min-sync-period=10s" if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then params+=" ${KUBEPROXY_TEST_ARGS}" fi diff --git a/cluster/gce/gci/configure-helper.sh b/cluster/gce/gci/configure-helper.sh index 261229486ab..b32183b48ad 100644 --- a/cluster/gce/gci/configure-helper.sh +++ b/cluster/gce/gci/configure-helper.sh @@ -820,6 +820,7 @@ function start-kube-proxy { if [[ -n "${FEATURE_GATES:-}" ]]; then params+=" --feature-gates=${FEATURE_GATES}" fi + params+=" --iptables-sync-period=1m --iptables-min-sync-period=10s" if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then params+=" ${KUBEPROXY_TEST_ARGS}" fi diff --git a/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest b/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest index 7818472d293..7767123ba2f 100644 --- a/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest +++ b/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest @@ -29,8 +29,10 @@ {% set feature_gates = "--feature-gates=" + grains.feature_gates -%} {% endif -%} +{% set throttles = "--iptables-sync-period=1m --iptables-min-sync-period=10s" -%} + # test_args should always go last to overwrite prior configuration -{% set params = log_level + " " + feature_gates + " " + test_args -%} +{% set params = log_level + " " + throttles + " " + feature_gates + " " + test_args -%} {% set container_env = "" -%} diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 792f115e0fd..cbccdc2918f 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/sysctl:go_default_library", @@ -31,10 +32,10 @@ go_library( "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) @@ -46,6 +47,7 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index db616b9bbbc..16776973386 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -37,10 +37,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/helper" apiservice "k8s.io/kubernetes/pkg/api/service" @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/async" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -314,7 +315,7 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { // and services that provide the actual backends. type Proxier struct { // endpointsChanges and serviceChanges contains all changes to endpoints and - // services that happened since last syncProxyRules call. For a single object, + // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. endpointsChanges endpointsChangeMap @@ -330,12 +331,9 @@ type Proxier struct { endpointsSynced bool servicesSynced bool initialized int32 - - throttle flowcontrol.RateLimiter + syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. - syncPeriod time.Duration - minSyncPeriod time.Duration iptables utiliptables.Interface masqueradeAll bool masqueradeMark string @@ -409,7 +407,7 @@ func NewProxier(ipt utiliptables.Interface, ) (*Proxier, error) { // check valid user input if minSyncPeriod > syncPeriod { - return nil, fmt.Errorf("min-sync (%v) must be <= sync(%v)", minSyncPeriod, syncPeriod) + return nil, fmt.Errorf("minSyncPeriod (%v) must be <= syncPeriod (%v)", minSyncPeriod, syncPeriod) } // Set the route_localnet sysctl we need for @@ -442,23 +440,12 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps - var throttle flowcontrol.RateLimiter - // Defaulting back to not limit sync rate when minSyncPeriod is 0. - if minSyncPeriod != 0 { - syncsPerSecond := float32(time.Second) / float32(minSyncPeriod) - // The average use case will process 2 updates in short succession - throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2) - } - - return &Proxier{ + proxier := &Proxier{ portsMap: make(map[localPort]closeable), serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), endpointsChanges: newEndpointsChangeMap(hostname), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - throttle: throttle, iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -475,7 +462,11 @@ func NewProxier(ipt utiliptables.Interface, filterRules: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), - }, nil + } + burstSyncs := 2 + glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + return proxier, nil } // CleanupLeftovers removes all iptables rules and chains created by the Proxier @@ -566,24 +557,18 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { return encounteredError } -// Sync is called to immediately synchronize the proxier state to iptables +// Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { - t := time.NewTicker(proxier.syncPeriod) - defer t.Stop() // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } - for { - <-t.C - glog.V(6).Infof("Periodic sync") - proxier.Sync() - } + proxier.syncRunner.Loop(wait.NeverStop) } func (proxier *Proxier) setInitialized(value bool) { @@ -601,21 +586,21 @@ func (proxier *Proxier) isInitialized() bool { func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } @@ -624,7 +609,8 @@ func (proxier *Proxier) OnServiceSynced() { proxier.servicesSynced = true proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.mu.Unlock() - // Call it unconditionally - this is called once per lifetime. + + // Sync unconditionally - this is called once per lifetime. proxier.syncProxyRules() } @@ -674,21 +660,21 @@ func updateServiceMap( func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } @@ -697,7 +683,8 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.endpointsSynced = true proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.mu.Unlock() - // Call it unconditionally - this is called once per lifetime. + + // Sync unconditionally - this is called once per lifetime. proxier.syncProxyRules() } @@ -909,14 +896,11 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() -// assumes proxier.mu is held +// This assumes proxier.mu is NOT held func (proxier *Proxier) syncProxyRules() { proxier.mu.Lock() defer proxier.mu.Unlock() - if proxier.throttle != nil { - proxier.throttle.Accept() - } start := time.Now() defer func() { SyncProxyRulesLatency.Observe(sinceInMicroseconds(start)) @@ -928,10 +912,9 @@ func (proxier *Proxier) syncProxyRules() { return } - // We assume that if syncProxyRules was called, we really want to sync them, - // even if nothing changed in the meantime. In other words, caller are - // responsible for detecting no-op changes and not calling syncProxyRules in - // such cases. + // We assume that if this was called, we really want to sync them, + // even if nothing changed in the meantime. In other words, callers are + // responsible for detecting no-op changes and not calling this function. hcServices, staleServices := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) hcEndpoints, staleEndpoints := updateEndpointsMap( diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index a665c3c57fe..33f2386c467 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -21,6 +21,7 @@ import ( "reflect" "strconv" "testing" + "time" "github.com/davecgh/go-spew/spew" @@ -34,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" @@ -383,7 +385,7 @@ const testHostname = "test-hostname" func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. - return &Proxier{ + p := &Proxier{ exec: &exec.FakeExec{}, serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), @@ -401,6 +403,8 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), } + p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) + return p } func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {