Use BoundedFrequencyRunner in kube-proxy

This commit is contained in:
Tim Hockin 2017-05-21 21:44:45 -07:00
parent bbb80c252b
commit 2856fde23b
6 changed files with 41 additions and 48 deletions

View File

@ -615,6 +615,7 @@ function start-kube-proxy {
if [[ -n "${FEATURE_GATES:-}" ]]; then if [[ -n "${FEATURE_GATES:-}" ]]; then
params+=" --feature-gates=${FEATURE_GATES}" params+=" --feature-gates=${FEATURE_GATES}"
fi fi
params+=" --iptables-sync-period=1m --iptables-min-sync-period=10s"
if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then
params+=" ${KUBEPROXY_TEST_ARGS}" params+=" ${KUBEPROXY_TEST_ARGS}"
fi fi

View File

@ -820,6 +820,7 @@ function start-kube-proxy {
if [[ -n "${FEATURE_GATES:-}" ]]; then if [[ -n "${FEATURE_GATES:-}" ]]; then
params+=" --feature-gates=${FEATURE_GATES}" params+=" --feature-gates=${FEATURE_GATES}"
fi fi
params+=" --iptables-sync-period=1m --iptables-min-sync-period=10s"
if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then
params+=" ${KUBEPROXY_TEST_ARGS}" params+=" ${KUBEPROXY_TEST_ARGS}"
fi fi

View File

@ -29,8 +29,10 @@
{% set feature_gates = "--feature-gates=" + grains.feature_gates -%} {% set feature_gates = "--feature-gates=" + grains.feature_gates -%}
{% endif -%} {% endif -%}
{% set throttles = "--iptables-sync-period=1m --iptables-min-sync-period=10s" -%}
# test_args should always go last to overwrite prior configuration # test_args should always go last to overwrite prior configuration
{% set params = log_level + " " + feature_gates + " " + test_args -%} {% set params = log_level + " " + throttles + " " + feature_gates + " " + test_args -%}
{% set container_env = "" -%} {% set container_env = "" -%}

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/exec:go_default_library", "//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/sysctl:go_default_library", "//pkg/util/sysctl:go_default_library",
@ -31,10 +32,10 @@ go_library(
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
], ],
) )
@ -46,6 +47,7 @@ go_test(
deps = [ deps = [
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/exec:go_default_library", "//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library", "//pkg/util/iptables/testing:go_default_library",

View File

@ -37,10 +37,10 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
clientv1 "k8s.io/client-go/pkg/api/v1" clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/helper" "k8s.io/kubernetes/pkg/api/helper"
apiservice "k8s.io/kubernetes/pkg/api/service" apiservice "k8s.io/kubernetes/pkg/api/service"
@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
@ -314,7 +315,7 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
// and services that provide the actual backends. // and services that provide the actual backends.
type Proxier struct { type Proxier struct {
// endpointsChanges and serviceChanges contains all changes to endpoints and // endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since last syncProxyRules call. For a single object, // services that happened since iptables was synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them, // changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those. // current is state after applying all of those.
endpointsChanges endpointsChangeMap endpointsChanges endpointsChangeMap
@ -330,12 +331,9 @@ type Proxier struct {
endpointsSynced bool endpointsSynced bool
servicesSynced bool servicesSynced bool
initialized int32 initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
throttle flowcontrol.RateLimiter
// These are effectively const and do not need the mutex to be held. // These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
minSyncPeriod time.Duration
iptables utiliptables.Interface iptables utiliptables.Interface
masqueradeAll bool masqueradeAll bool
masqueradeMark string masqueradeMark string
@ -409,7 +407,7 @@ func NewProxier(ipt utiliptables.Interface,
) (*Proxier, error) { ) (*Proxier, error) {
// check valid user input // check valid user input
if minSyncPeriod > syncPeriod { if minSyncPeriod > syncPeriod {
return nil, fmt.Errorf("min-sync (%v) must be <= sync(%v)", minSyncPeriod, syncPeriod) return nil, fmt.Errorf("minSyncPeriod (%v) must be <= syncPeriod (%v)", minSyncPeriod, syncPeriod)
} }
// Set the route_localnet sysctl we need for // Set the route_localnet sysctl we need for
@ -442,23 +440,12 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
var throttle flowcontrol.RateLimiter proxier := &Proxier{
// Defaulting back to not limit sync rate when minSyncPeriod is 0.
if minSyncPeriod != 0 {
syncsPerSecond := float32(time.Second) / float32(minSyncPeriod)
// The average use case will process 2 updates in short succession
throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2)
}
return &Proxier{
portsMap: make(map[localPort]closeable), portsMap: make(map[localPort]closeable),
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(hostname), endpointsChanges: newEndpointsChangeMap(hostname),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
iptables: ipt, iptables: ipt,
masqueradeAll: masqueradeAll, masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark, masqueradeMark: masqueradeMark,
@ -475,7 +462,11 @@ func NewProxier(ipt utiliptables.Interface,
filterRules: bytes.NewBuffer(nil), filterRules: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil),
}, nil }
burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
return proxier, nil
} }
// CleanupLeftovers removes all iptables rules and chains created by the Proxier // CleanupLeftovers removes all iptables rules and chains created by the Proxier
@ -566,24 +557,18 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
return encounteredError return encounteredError
} }
// Sync is called to immediately synchronize the proxier state to iptables // Sync is called to synchronize the proxier state to iptables as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
proxier.syncProxyRules() proxier.syncRunner.Run()
} }
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) SyncLoop() {
t := time.NewTicker(proxier.syncPeriod)
defer t.Stop()
// Update healthz timestamp at beginning in case Sync() never succeeds. // Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp() proxier.healthzServer.UpdateTimestamp()
} }
for { proxier.syncRunner.Loop(wait.NeverStop)
<-t.C
glog.V(6).Infof("Periodic sync")
proxier.Sync()
}
} }
func (proxier *Proxier) setInitialized(value bool) { func (proxier *Proxier) setInitialized(value bool) {
@ -601,21 +586,21 @@ func (proxier *Proxier) isInitialized() bool {
func (proxier *Proxier) OnServiceAdd(service *api.Service) { func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
proxier.syncProxyRules() proxier.syncRunner.Run()
} }
} }
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
proxier.syncProxyRules() proxier.syncRunner.Run()
} }
} }
func (proxier *Proxier) OnServiceDelete(service *api.Service) { func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
proxier.syncProxyRules() proxier.syncRunner.Run()
} }
} }
@ -624,7 +609,8 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.servicesSynced = true proxier.servicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock() proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules() proxier.syncProxyRules()
} }
@ -674,21 +660,21 @@ func updateServiceMap(
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
proxier.syncProxyRules() proxier.syncRunner.Run()
} }
} }
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.syncProxyRules() proxier.syncRunner.Run()
} }
} }
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
proxier.syncProxyRules() proxier.syncRunner.Run()
} }
} }
@ -697,7 +683,8 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.endpointsSynced = true proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock() proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules() proxier.syncProxyRules()
} }
@ -909,14 +896,11 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
// This is where all of the iptables-save/restore calls happen. // This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit() // The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held // This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() { func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
if proxier.throttle != nil {
proxier.throttle.Accept()
}
start := time.Now() start := time.Now()
defer func() { defer func() {
SyncProxyRulesLatency.Observe(sinceInMicroseconds(start)) SyncProxyRulesLatency.Observe(sinceInMicroseconds(start))
@ -928,10 +912,9 @@ func (proxier *Proxier) syncProxyRules() {
return return
} }
// We assume that if syncProxyRules was called, we really want to sync them, // We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, caller are // even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling syncProxyRules in // responsible for detecting no-op changes and not calling this function.
// such cases.
hcServices, staleServices := updateServiceMap( hcServices, staleServices := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges) proxier.serviceMap, &proxier.serviceChanges)
hcEndpoints, staleEndpoints := updateEndpointsMap( hcEndpoints, staleEndpoints := updateEndpointsMap(

View File

@ -21,6 +21,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"testing" "testing"
"time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
@ -34,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
@ -383,7 +385,7 @@ const testHostname = "test-hostname"
func NewFakeProxier(ipt utiliptables.Interface) *Proxier { func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine // TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method. // invocation into a Run() method.
return &Proxier{ p := &Proxier{
exec: &exec.FakeExec{}, exec: &exec.FakeExec{},
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
@ -401,6 +403,8 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
natChains: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil),
} }
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
return p
} }
func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool { func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {