Merge pull request #46266 from thockin/proxy-periodic-runner-2

Automatic merge from submit-queue (batch tested with PRs 44774, 46266, 46248, 46403, 46430)

kube-proxy: ratelimit runs of iptables by sync-period flags

This bounds how frequently iptables can be synced.  It will be no more often than every 10 seconds and no less often than every 1 minute, by default.

@timothysc FYI

@dcbw @freehan FYI
This commit is contained in:
Kubernetes Submit Queue 2017-05-25 06:17:56 -07:00 committed by GitHub
commit ee671e64ee
19 changed files with 720 additions and 120 deletions

2
Godeps/Godeps.json generated
View File

@ -1750,7 +1750,7 @@
},
{
"ImportPath": "github.com/juju/ratelimit",
"Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177"
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
},
{
"ImportPath": "github.com/kardianos/osext",

View File

@ -620,6 +620,7 @@ function start-kube-proxy {
if [[ -n "${FEATURE_GATES:-}" ]]; then
params+=" --feature-gates=${FEATURE_GATES}"
fi
params+=" --iptables-sync-period=1m --iptables-min-sync-period=10s"
if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then
params+=" ${KUBEPROXY_TEST_ARGS}"
fi

View File

@ -825,6 +825,7 @@ function start-kube-proxy {
if [[ -n "${FEATURE_GATES:-}" ]]; then
params+=" --feature-gates=${FEATURE_GATES}"
fi
params+=" --iptables-sync-period=1m --iptables-min-sync-period=10s"
if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then
params+=" ${KUBEPROXY_TEST_ARGS}"
fi

View File

@ -29,8 +29,10 @@
{% set feature_gates = "--feature-gates=" + grains.feature_gates -%}
{% endif -%}
{% set throttles = "--iptables-sync-period=1m --iptables-min-sync-period=10s" -%}
# test_args should always go last to overwrite prior configuration
{% set params = log_level + " " + feature_gates + " " + test_args -%}
{% set params = log_level + " " + throttles + " " + feature_gates + " " + test_args -%}
{% set container_env = "" -%}

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/sysctl:go_default_library",
@ -31,10 +32,10 @@ go_library(
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
@ -46,6 +47,7 @@ go_test(
deps = [
"//pkg/api:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library",

View File

@ -37,10 +37,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/helper"
apiservice "k8s.io/kubernetes/pkg/api/service"
@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
@ -160,7 +161,7 @@ func (e *endpointsInfo) String() string {
}
// returns a new serviceInfo struct
func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
onlyNodeLocalEndpoints := false
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) &&
apiservice.RequestsOnlyLocalTraffic(service) {
@ -185,7 +186,7 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
if apiservice.NeedsHealthCheck(service) {
p := apiservice.GetServiceHealthCheckNodePort(service)
if p == 0 {
glog.Errorf("Service %q has no healthcheck nodeport", serviceName)
glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String())
} else {
info.healthCheckNodePort = int(p)
}
@ -267,46 +268,46 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
existingPorts := sets.NewString()
for serviceName, info := range other {
existingPorts.Insert(serviceName.Port)
_, exists := (*sm)[serviceName]
for svcPortName, info := range other {
existingPorts.Insert(svcPortName.Port)
_, exists := (*sm)[svcPortName]
if !exists {
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol)
glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
} else {
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol)
glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
}
(*sm)[serviceName] = info
(*sm)[svcPortName] = info
}
return existingPorts
}
func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) {
for serviceName := range other {
if existingPorts.Has(serviceName.Port) {
for svcPortName := range other {
if existingPorts.Has(svcPortName.Port) {
continue
}
info, exists := (*sm)[serviceName]
info, exists := (*sm)[svcPortName]
if exists {
glog.V(1).Infof("Removing service %q", serviceName)
glog.V(1).Infof("Removing service port %q", svcPortName)
if info.protocol == api.ProtocolUDP {
staleServices.Insert(info.clusterIP.String())
}
delete(*sm, serviceName)
delete(*sm, svcPortName)
} else {
glog.Errorf("Service %q removed, but doesn't exists", serviceName)
glog.Errorf("Service port %q removed, but doesn't exists", svcPortName)
}
}
}
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
for svcPort := range other {
em[svcPort] = other[svcPort]
for svcPortName := range other {
em[svcPortName] = other[svcPortName]
}
}
func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
for svcPort := range other {
delete(em, svcPort)
for svcPortName := range other {
delete(em, svcPortName)
}
}
@ -314,7 +315,7 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
// and services that provide the actual backends.
type Proxier struct {
// endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since last syncProxyRules call. For a single object,
// services that happened since iptables was synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
endpointsChanges endpointsChangeMap
@ -330,12 +331,9 @@ type Proxier struct {
endpointsSynced bool
servicesSynced bool
initialized int32
throttle flowcontrol.RateLimiter
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
minSyncPeriod time.Duration
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
@ -409,7 +407,7 @@ func NewProxier(ipt utiliptables.Interface,
) (*Proxier, error) {
// check valid user input
if minSyncPeriod > syncPeriod {
return nil, fmt.Errorf("min-sync (%v) must be <= sync(%v)", minSyncPeriod, syncPeriod)
return nil, fmt.Errorf("minSyncPeriod (%v) must be <= syncPeriod (%v)", minSyncPeriod, syncPeriod)
}
// Set the route_localnet sysctl we need for
@ -442,23 +440,12 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
var throttle flowcontrol.RateLimiter
// Defaulting back to not limit sync rate when minSyncPeriod is 0.
if minSyncPeriod != 0 {
syncsPerSecond := float32(time.Second) / float32(minSyncPeriod)
// The average use case will process 2 updates in short succession
throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2)
}
return &Proxier{
proxier := &Proxier{
portsMap: make(map[localPort]closeable),
serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(hostname),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
@ -475,7 +462,11 @@ func NewProxier(ipt utiliptables.Interface,
filterRules: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
}, nil
}
burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
return proxier, nil
}
// CleanupLeftovers removes all iptables rules and chains created by the Proxier
@ -566,24 +557,18 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
return encounteredError
}
// Sync is called to immediately synchronize the proxier state to iptables
// Sync is called to synchronize the proxier state to iptables as soon as possible.
func (proxier *Proxier) Sync() {
proxier.syncProxyRules()
proxier.syncRunner.Run()
}
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
t := time.NewTicker(proxier.syncPeriod)
defer t.Stop()
// Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
for {
<-t.C
glog.V(6).Infof("Periodic sync")
proxier.Sync()
}
proxier.syncRunner.Loop(wait.NeverStop)
}
func (proxier *Proxier) setInitialized(value bool) {
@ -601,21 +586,21 @@ func (proxier *Proxier) isInitialized() bool {
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
proxier.syncProxyRules()
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
proxier.syncProxyRules()
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
proxier.syncProxyRules()
proxier.syncRunner.Run()
}
}
@ -624,7 +609,8 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.servicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
@ -662,9 +648,9 @@ func updateServiceMap(
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
hcServices = make(map[types.NamespacedName]uint16)
for svcPort, info := range serviceMap {
for svcPortName, info := range serviceMap {
if info.healthCheckNodePort != 0 {
hcServices[svcPort.NamespacedName] = uint16(info.healthCheckNodePort)
hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
}
}
@ -674,21 +660,21 @@ func updateServiceMap(
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
proxier.syncProxyRules()
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.syncProxyRules()
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
proxier.syncProxyRules()
proxier.syncRunner.Run()
}
}
@ -697,7 +683,8 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
@ -738,18 +725,18 @@ func updateEndpointsMap(
// <staleEndpoints> are modified by this function with detected stale
// connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
for svcPort, epList := range oldEndpointsMap {
for svcPortName, epList := range oldEndpointsMap {
for _, ep := range epList {
stale := true
for i := range newEndpointsMap[svcPort] {
if *newEndpointsMap[svcPort][i] == *ep {
for i := range newEndpointsMap[svcPortName] {
if *newEndpointsMap[svcPortName][i] == *ep {
stale = false
break
}
}
if stale {
glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint)
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint)
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true
}
}
}
@ -757,10 +744,10 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap,
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
localIPs := make(map[types.NamespacedName]sets.String)
for svcPort := range endpointsMap {
for _, ep := range endpointsMap[svcPort] {
for svcPortName := range endpointsMap {
for _, ep := range endpointsMap[svcPortName] {
if ep.isLocal {
nsn := svcPort.NamespacedName
nsn := svcPortName.NamespacedName
if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString()
}
@ -792,7 +779,7 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd
glog.Warningf("ignoring invalid endpoint port %s", port.Name)
continue
}
svcPort := proxy.ServicePortName{
svcPortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
Port: port.Name,
}
@ -806,14 +793,14 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd
endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
}
endpointsMap[svcPort] = append(endpointsMap[svcPort], epInfo)
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo)
}
if glog.V(3) {
newEPList := []string{}
for _, ep := range endpointsMap[svcPort] {
for _, ep := range endpointsMap[svcPortName] {
newEPList = append(newEPList, ep.endpoint)
}
glog.Infof("Setting endpoints for %q to %+v", svcPort, newEPList)
glog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList)
}
}
}
@ -835,8 +822,8 @@ func serviceToServiceMap(service *api.Service) proxyServiceMap {
serviceMap := make(proxyServiceMap)
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
serviceMap[serviceName] = newServiceInfo(serviceName, servicePort, service)
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service)
}
return serviceMap
}
@ -909,14 +896,11 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
// This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
if proxier.throttle != nil {
proxier.throttle.Accept()
}
start := time.Now()
defer func() {
SyncProxyRulesLatency.Observe(sinceInMicroseconds(start))
@ -928,10 +912,9 @@ func (proxier *Proxier) syncProxyRules() {
return
}
// We assume that if syncProxyRules was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, caller are
// responsible for detecting no-op changes and not calling syncProxyRules in
// such cases.
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
hcServices, staleServices := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
hcEndpoints, staleEndpoints := updateEndpointsMap(

View File

@ -21,6 +21,7 @@ import (
"reflect"
"strconv"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
@ -34,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
@ -383,7 +385,7 @@ const testHostname = "test-hostname"
func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method.
return &Proxier{
p := &Proxier{
exec: &exec.FakeExec{},
serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(),
@ -401,6 +403,8 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
}
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
return p
}
func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {

View File

@ -10,13 +10,23 @@ load(
go_library(
name = "go_default_library",
srcs = ["runner.go"],
srcs = [
"bounded_frequency_runner.go",
"runner.go",
],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["runner_test.go"],
srcs = [
"bounded_frequency_runner_test.go",
"runner_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
)

View File

@ -0,0 +1,229 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package async
import (
"fmt"
"sync"
"time"
"k8s.io/client-go/util/flowcontrol"
"github.com/golang/glog"
)
// BoundedFrequencyRunner manages runs of a user-provided function.
// See NewBoundedFrequencyRunner for examples.
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs
}
// designed so that flowcontrol.RateLimiter satisfies
type rateLimiter interface {
TryAccept() bool
Stop()
}
type nullLimiter struct{}
func (nullLimiter) TryAccept() bool {
return true
}
func (nullLimiter) Stop() {}
var _ rateLimiter = nullLimiter{}
// for testing
type timer interface {
// C returns the timer's selectable channel.
C() <-chan time.Time
// See time.Timer.Reset.
Reset(d time.Duration) bool
// See time.Timer.Stop.
Stop() bool
// See time.Now.
Now() time.Time
// See time.Since.
Since(t time.Time) time.Duration
// See time.Sleep.
Sleep(d time.Duration)
}
// implement our timer in terms of std time.Timer.
type realTimer struct {
*time.Timer
}
func (rt realTimer) C() <-chan time.Time {
return rt.Timer.C
}
func (rt realTimer) Now() time.Time {
return time.Now()
}
func (rt realTimer) Since(t time.Time) time.Duration {
return time.Since(t)
}
func (rt realTimer) Sleep(d time.Duration) {
time.Sleep(d)
}
var _ timer = realTimer{}
// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
// which will manage runs of the specified function.
//
// All runs will be async to the caller of BoundedFrequencyRunner.Run, but
// multiple runs are serialized. If the function needs to hold locks, it must
// take them internally.
//
// Runs of the funtion will have at least minInterval between them (from
// completion to next start), except that up to bursts may be allowed. Burst
// runs are "accumulated" over time, one per minInterval up to burstRuns total.
// This can be used, for example, to mitigate the impact of expensive operations
// being called in response to user-initiated operations. Run requests that
// would violate the minInterval are coallesced and run at the next opportunity.
//
// The function will be run at least once per maxInterval. For example, this can
// force periodic refreshes of state in the absence of anyone calling Run.
//
// Examples:
//
// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1)
// - fn will have at least 1 second between runs
// - fn will have no more than 5 seconds between runs
//
// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
// - fn will have at least 3 seconds between runs, with up to 3 burst runs
// - fn will have no more than 10 seconds between runs
//
// The maxInterval must be greater than or equal to the minInterval, If the
// caller passes a maxInterval less than minInterval, this function will panic.
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately
<-timer.C() // consume the first tick
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}
// Make an instance with dependencies injected.
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
if maxInterval < minInterval {
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval))
}
if timer == nil {
panic(fmt.Sprintf("%s: timer must be non-nil", name))
}
bfr := &BoundedFrequencyRunner{
name: name,
fn: fn,
minInterval: minInterval,
maxInterval: maxInterval,
run: make(chan struct{}, 16),
timer: timer,
}
if minInterval == 0 {
bfr.limiter = nullLimiter{}
} else {
// allow burst updates in short succession
qps := float32(time.Second) / float32(minInterval)
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
}
return bfr
}
// Loop handles the periodic timer and run requests. This is expected to be
// called as a goroutine.
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
glog.V(3).Infof("%s Loop running", bfr.name)
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
glog.V(3).Infof("%s Loop stopping", bfr.name)
return
case <-bfr.timer.C():
bfr.tryRun()
case <-bfr.run:
bfr.tryRun()
}
}
}
// Run the function as soon as possible. If this is called while Loop is not
// running, the call may be deferred indefinitely.
func (bfr *BoundedFrequencyRunner) Run() {
bfr.run <- struct{}{}
}
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) stop() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
bfr.limiter.Stop()
bfr.timer.Stop()
}
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
if bfr.limiter.TryAccept() {
// We're allowed to run the function right now.
bfr.fn()
bfr.lastRun = bfr.timer.Now()
bfr.timer.Stop()
bfr.timer.Reset(bfr.maxInterval)
glog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
return
}
// It can't run right now, figure out when it can run next.
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
nextPossible := bfr.minInterval - elapsed // time to next possible run
nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
glog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
if nextPossible < nextScheduled {
// Set the timer for ASAP, but don't drain here. Assuming Loop is running,
// it might get a delivery in the mean time, but that is OK.
bfr.timer.Stop()
bfr.timer.Reset(nextPossible)
glog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
}
}

View File

@ -0,0 +1,332 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package async
import (
"sync"
"testing"
"time"
)
// Track calls to the managed function.
type receiver struct {
lock sync.Mutex
run bool
}
func (r *receiver) F() {
r.lock.Lock()
defer r.lock.Unlock()
r.run = true
}
func (r *receiver) reset() bool {
r.lock.Lock()
defer r.lock.Unlock()
was := r.run
r.run = false
return was
}
// A single change event in the fake timer.
type timerUpdate struct {
active bool
next time.Duration // iff active == true
}
// Fake time.
type fakeTimer struct {
c chan time.Time
lock sync.Mutex
now time.Time
active bool
updated chan timerUpdate
}
func newFakeTimer() *fakeTimer {
ft := &fakeTimer{
c: make(chan time.Time),
updated: make(chan timerUpdate),
}
return ft
}
func (ft *fakeTimer) C() <-chan time.Time {
return ft.c
}
func (ft *fakeTimer) Reset(in time.Duration) bool {
ft.lock.Lock()
defer ft.lock.Unlock()
was := ft.active
ft.active = true
ft.updated <- timerUpdate{
active: true,
next: in,
}
return was
}
func (ft *fakeTimer) Stop() bool {
ft.lock.Lock()
defer ft.lock.Unlock()
was := ft.active
ft.active = false
ft.updated <- timerUpdate{
active: false,
}
return was
}
func (ft *fakeTimer) Now() time.Time {
ft.lock.Lock()
defer ft.lock.Unlock()
return ft.now
}
func (ft *fakeTimer) Since(t time.Time) time.Duration {
ft.lock.Lock()
defer ft.lock.Unlock()
return ft.now.Sub(t)
}
func (ft *fakeTimer) Sleep(d time.Duration) {
ft.lock.Lock()
defer ft.lock.Unlock()
ft.advance(d)
}
// advance the current time.
func (ft *fakeTimer) advance(d time.Duration) {
ft.lock.Lock()
defer ft.lock.Unlock()
ft.now = ft.now.Add(d)
}
// send a timer tick.
func (ft *fakeTimer) tick() {
ft.lock.Lock()
defer ft.lock.Unlock()
ft.active = false
ft.c <- ft.now
}
// return the calling line number (for printing)
// test the timer's state
func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
if upd.active != active {
t.Fatalf("%s: expected timer active=%v", name, active)
}
if active && upd.next != next {
t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
}
}
// test and reset the receiver's state
func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
triggered := receiver.reset()
if expected && !triggered {
t.Fatalf("%s: function should have been called", name)
} else if !expected && triggered {
t.Fatalf("%s: function should not have been called", name)
}
}
// Durations embedded in test cases depend on these.
var minInterval = 1 * time.Second
var maxInterval = 10 * time.Second
func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
upd := <-timer.updated // wait for stop
checkReceiver(name, t, obj, expectCall)
checkReceiver(name, t, obj, false) // prove post-condition
checkTimer(name, t, upd, false, 0)
upd = <-timer.updated // wait for reset
checkTimer(name, t, upd, true, expectNext)
}
func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
waitForReset(name, t, timer, obj, true, maxInterval)
}
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
waitForReset(name, t, timer, obj, false, expectNext)
}
func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
obj := &receiver{}
timer := newFakeTimer()
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
stop := make(chan struct{})
var upd timerUpdate
// Start.
go runner.Loop(stop)
upd = <-timer.updated // wait for initial time to be set to max
checkTimer("init", t, upd, true, maxInterval)
checkReceiver("init", t, obj, false)
// Run once, immediately.
// rel=0ms
runner.Run()
waitForRun("first run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(500 * time.Millisecond) // rel=500ms
runner.Run()
waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(499 * time.Millisecond) // rel=999ms
runner.Run()
waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
// Run again, once minInterval has passed (race with timer).
timer.advance(1 * time.Millisecond) // rel=1000ms
runner.Run()
waitForRun("second run", t, timer, obj)
// Run again, before minInterval expires.
// rel=0ms
runner.Run()
waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // rel=1ms
runner.Run()
waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
// Let the timer tick prematurely.
timer.advance(998 * time.Millisecond) // rel=999ms
timer.tick()
waitForDefer("premature tick", t, timer, obj, 1*time.Millisecond)
// Let the timer tick.
timer.advance(1 * time.Millisecond) // rel=1000ms
timer.tick()
waitForRun("first tick", t, timer, obj)
// Let the timer tick.
timer.advance(10 * time.Second) // rel=10000ms
timer.tick()
waitForRun("second tick", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // rel=1ms
runner.Run()
waitForDefer("too soon after tick", t, timer, obj, 999*time.Millisecond)
// Let the timer tick.
timer.advance(999 * time.Millisecond) // rel=1000ms
timer.tick()
waitForRun("third tick", t, timer, obj)
// Clean up.
stop <- struct{}{}
}
func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
obj := &receiver{}
timer := newFakeTimer()
runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
stop := make(chan struct{})
var upd timerUpdate
// Start.
go runner.Loop(stop)
upd = <-timer.updated // wait for initial time to be set to max
checkTimer("init", t, upd, true, maxInterval)
checkReceiver("init", t, obj, false)
// Run once, immediately.
// abs=0ms, rel=0ms
runner.Run()
waitForRun("first run", t, timer, obj)
// Run again, before minInterval expires, with burst.
timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
runner.Run()
waitForRun("second run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
runner.Run()
waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
runner.Run()
waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
runner.Run()
waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
// Run again, once burst has replenished.
timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
runner.Run()
waitForRun("third run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
runner.Run()
waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
runner.Run()
waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
// Run again, once burst has replenished.
timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
runner.Run()
waitForRun("fourth run", t, timer, obj)
// Run again, once burst has fully replenished.
timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
runner.Run()
waitForRun("fifth run", t, timer, obj)
runner.Run()
waitForRun("sixth run", t, timer, obj)
runner.Run()
waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
// Let the timer tick.
timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
timer.tick()
waitForRun("first tick", t, timer, obj)
// Let the timer tick.
timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
timer.tick()
waitForRun("second tick", t, timer, obj)
// Clean up.
stop <- struct{}{}
}

View File

@ -480,7 +480,7 @@
},
{
"ImportPath": "github.com/juju/ratelimit",
"Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177"
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
},
{
"ImportPath": "github.com/karlseguin/ccache",

View File

@ -160,7 +160,7 @@
},
{
"ImportPath": "github.com/juju/ratelimit",
"Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177"
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
},
{
"ImportPath": "github.com/mailru/easyjson/buffer",

View File

@ -51,6 +51,22 @@ type tokenBucketRateLimiter struct {
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
return newTokenBucketRateLimiter(limiter, qps)
}
// An injectable, mockable clock interface.
type Clock interface {
ratelimit.Clock
}
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
// but allows an injectable clock, for testing.
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter {
limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock)
return newTokenBucketRateLimiter(limiter, qps)
}
func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter {
return &tokenBucketRateLimiter{
limiter: limiter,
qps: qps,

View File

@ -236,7 +236,7 @@
},
{
"ImportPath": "github.com/juju/ratelimit",
"Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177"
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
},
{
"ImportPath": "github.com/mailru/easyjson/buffer",

View File

@ -228,7 +228,7 @@
},
{
"ImportPath": "github.com/juju/ratelimit",
"Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177"
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
},
{
"ImportPath": "github.com/mailru/easyjson/buffer",

View File

@ -228,7 +228,7 @@
},
{
"ImportPath": "github.com/juju/ratelimit",
"Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177"
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
},
{
"ImportPath": "github.com/mailru/easyjson/buffer",

View File

@ -37,9 +37,6 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//vendor/github.com/emicklei/go-restful-swagger12/test_package:all-srcs",
],
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,16 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -2,7 +2,7 @@
// Licensed under the LGPLv3 with static-linking exception.
// See LICENCE file for details.
// The ratelimit package provides an efficient token bucket implementation
// Package ratelimit provides an efficient token bucket implementation
// that can be used to limit the rate of arbitrary things.
// See http://en.wikipedia.org/wiki/Token_bucket.
package ratelimit
@ -21,6 +21,7 @@ type Bucket struct {
capacity int64
quantum int64
fillInterval time.Duration
clock Clock
// The mutex guards the fields following it.
mu sync.Mutex
@ -33,12 +34,37 @@ type Bucket struct {
availTick int64
}
// Clock is used to inject testable fakes.
type Clock interface {
Now() time.Time
Sleep(d time.Duration)
}
// realClock implements Clock in terms of standard time functions.
type realClock struct{}
// Now is identical to time.Now.
func (realClock) Now() time.Time {
return time.Now()
}
// Sleep is identical to time.Sleep.
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
// NewBucket returns a new token bucket that fills at the
// rate of one token every fillInterval, up to the given
// maximum capacity. Both arguments must be
// positive. The bucket is initially full.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithQuantum(fillInterval, capacity, 1)
return NewBucketWithClock(fillInterval, capacity, realClock{})
}
// NewBucketWithClock is identical to NewBucket but injects a testable clock
// interface.
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}
// rateMargin specifes the allowed variance of actual
@ -51,12 +77,18 @@ const rateMargin = 0.01
// at high rates, the actual rate may be up to 1% different from the
// specified rate.
func NewBucketWithRate(rate float64, capacity int64) *Bucket {
return NewBucketWithRateAndClock(rate, capacity, realClock{})
}
// NewBucketWithRateAndClock is identical to NewBucketWithRate but injects a
// testable clock interface.
func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket {
for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {
fillInterval := time.Duration(1e9 * float64(quantum) / rate)
if fillInterval <= 0 {
continue
}
tb := NewBucketWithQuantum(fillInterval, capacity, quantum)
tb := NewBucketWithQuantumAndClock(fillInterval, capacity, quantum, clock)
if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {
return tb
}
@ -79,6 +111,12 @@ func nextQuantum(q int64) int64 {
// the specification of the quantum size - quantum tokens
// are added every fillInterval.
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, quantum, realClock{})
}
// NewBucketWithQuantumAndClock is identical to NewBucketWithQuantum but injects
// a testable clock interface.
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
if fillInterval <= 0 {
panic("token bucket fill interval is not > 0")
}
@ -89,7 +127,8 @@ func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *
panic("token bucket quantum is not > 0")
}
return &Bucket{
startTime: time.Now(),
clock: clock,
startTime: clock.Now(),
capacity: capacity,
quantum: quantum,
avail: capacity,
@ -101,7 +140,7 @@ func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *
// available.
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
time.Sleep(d)
tb.clock.Sleep(d)
}
}
@ -113,7 +152,7 @@ func (tb *Bucket) Wait(count int64) {
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {
d, ok := tb.TakeMaxDuration(count, maxWait)
if d > 0 {
time.Sleep(d)
tb.clock.Sleep(d)
}
return ok
}
@ -127,7 +166,7 @@ const infinityDuration time.Duration = 0x7fffffffffffffff
// Note that if the request is irrevocable - there is no way to return
// tokens to the bucket once this method commits us to taking them.
func (tb *Bucket) Take(count int64) time.Duration {
d, _ := tb.take(time.Now(), count, infinityDuration)
d, _ := tb.take(tb.clock.Now(), count, infinityDuration)
return d
}
@ -141,14 +180,14 @@ func (tb *Bucket) Take(count int64) time.Duration {
// wait until the tokens are actually available, and reports
// true.
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
return tb.take(time.Now(), count, maxWait)
return tb.take(tb.clock.Now(), count, maxWait)
}
// TakeAvailable takes up to count immediately available tokens from the
// bucket. It returns the number of tokens removed, or zero if there are
// no available tokens. It does not block.
func (tb *Bucket) TakeAvailable(count int64) int64 {
return tb.takeAvailable(time.Now(), count)
return tb.takeAvailable(tb.clock.Now(), count)
}
// takeAvailable is the internal version of TakeAvailable - it takes the
@ -178,7 +217,7 @@ func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
// tokens could have changed in the meantime. This method is intended
// primarily for metrics reporting and debugging.
func (tb *Bucket) Available() int64 {
return tb.available(time.Now())
return tb.available(tb.clock.Now())
}
// available is the internal version of available - it takes the current time as