diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 457664bcf88..48ce7dbe213 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1750,7 +1750,7 @@ }, { "ImportPath": "github.com/juju/ratelimit", - "Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177" + "Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342" }, { "ImportPath": "github.com/kardianos/osext", diff --git a/cluster/gce/container-linux/configure-helper.sh b/cluster/gce/container-linux/configure-helper.sh index 7263677fcdf..7924fd3cff1 100755 --- a/cluster/gce/container-linux/configure-helper.sh +++ b/cluster/gce/container-linux/configure-helper.sh @@ -620,6 +620,7 @@ function start-kube-proxy { if [[ -n "${FEATURE_GATES:-}" ]]; then params+=" --feature-gates=${FEATURE_GATES}" fi + params+=" --iptables-sync-period=1m --iptables-min-sync-period=10s" if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then params+=" ${KUBEPROXY_TEST_ARGS}" fi diff --git a/cluster/gce/gci/configure-helper.sh b/cluster/gce/gci/configure-helper.sh index e6d2ef7b94c..24d1db1984e 100644 --- a/cluster/gce/gci/configure-helper.sh +++ b/cluster/gce/gci/configure-helper.sh @@ -825,6 +825,7 @@ function start-kube-proxy { if [[ -n "${FEATURE_GATES:-}" ]]; then params+=" --feature-gates=${FEATURE_GATES}" fi + params+=" --iptables-sync-period=1m --iptables-min-sync-period=10s" if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then params+=" ${KUBEPROXY_TEST_ARGS}" fi diff --git a/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest b/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest index 7818472d293..7767123ba2f 100644 --- a/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest +++ b/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest @@ -29,8 +29,10 @@ {% set feature_gates = "--feature-gates=" + grains.feature_gates -%} {% endif -%} +{% set throttles = "--iptables-sync-period=1m --iptables-min-sync-period=10s" -%} + # test_args should always go last to overwrite prior configuration -{% set params = log_level + " " + feature_gates + " " + test_args -%} +{% set params = log_level + " " + throttles + " " + feature_gates + " " + test_args -%} {% set container_env = "" -%} diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 792f115e0fd..cbccdc2918f 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/sysctl:go_default_library", @@ -31,10 +32,10 @@ go_library( "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) @@ -46,6 +47,7 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9805c86dff7..16776973386 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -37,10 +37,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/helper" apiservice "k8s.io/kubernetes/pkg/api/service" @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/async" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -160,7 +161,7 @@ func (e *endpointsInfo) String() string { } // returns a new serviceInfo struct -func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { +func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { onlyNodeLocalEndpoints := false if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && apiservice.RequestsOnlyLocalTraffic(service) { @@ -185,7 +186,7 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se if apiservice.NeedsHealthCheck(service) { p := apiservice.GetServiceHealthCheckNodePort(service) if p == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", serviceName) + glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String()) } else { info.healthCheckNodePort = int(p) } @@ -267,46 +268,46 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { existingPorts := sets.NewString() - for serviceName, info := range other { - existingPorts.Insert(serviceName.Port) - _, exists := (*sm)[serviceName] + for svcPortName, info := range other { + existingPorts.Insert(svcPortName.Port) + _, exists := (*sm)[svcPortName] if !exists { - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol) + glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) } else { - glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol) + glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) } - (*sm)[serviceName] = info + (*sm)[svcPortName] = info } return existingPorts } func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { - for serviceName := range other { - if existingPorts.Has(serviceName.Port) { + for svcPortName := range other { + if existingPorts.Has(svcPortName.Port) { continue } - info, exists := (*sm)[serviceName] + info, exists := (*sm)[svcPortName] if exists { - glog.V(1).Infof("Removing service %q", serviceName) + glog.V(1).Infof("Removing service port %q", svcPortName) if info.protocol == api.ProtocolUDP { staleServices.Insert(info.clusterIP.String()) } - delete(*sm, serviceName) + delete(*sm, svcPortName) } else { - glog.Errorf("Service %q removed, but doesn't exists", serviceName) + glog.Errorf("Service port %q removed, but doesn't exists", svcPortName) } } } func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { - for svcPort := range other { - em[svcPort] = other[svcPort] + for svcPortName := range other { + em[svcPortName] = other[svcPortName] } } func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { - for svcPort := range other { - delete(em, svcPort) + for svcPortName := range other { + delete(em, svcPortName) } } @@ -314,7 +315,7 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { // and services that provide the actual backends. type Proxier struct { // endpointsChanges and serviceChanges contains all changes to endpoints and - // services that happened since last syncProxyRules call. For a single object, + // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. endpointsChanges endpointsChangeMap @@ -330,12 +331,9 @@ type Proxier struct { endpointsSynced bool servicesSynced bool initialized int32 - - throttle flowcontrol.RateLimiter + syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. - syncPeriod time.Duration - minSyncPeriod time.Duration iptables utiliptables.Interface masqueradeAll bool masqueradeMark string @@ -409,7 +407,7 @@ func NewProxier(ipt utiliptables.Interface, ) (*Proxier, error) { // check valid user input if minSyncPeriod > syncPeriod { - return nil, fmt.Errorf("min-sync (%v) must be <= sync(%v)", minSyncPeriod, syncPeriod) + return nil, fmt.Errorf("minSyncPeriod (%v) must be <= syncPeriod (%v)", minSyncPeriod, syncPeriod) } // Set the route_localnet sysctl we need for @@ -442,23 +440,12 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps - var throttle flowcontrol.RateLimiter - // Defaulting back to not limit sync rate when minSyncPeriod is 0. - if minSyncPeriod != 0 { - syncsPerSecond := float32(time.Second) / float32(minSyncPeriod) - // The average use case will process 2 updates in short succession - throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2) - } - - return &Proxier{ + proxier := &Proxier{ portsMap: make(map[localPort]closeable), serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), endpointsChanges: newEndpointsChangeMap(hostname), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - throttle: throttle, iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -475,7 +462,11 @@ func NewProxier(ipt utiliptables.Interface, filterRules: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), - }, nil + } + burstSyncs := 2 + glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + return proxier, nil } // CleanupLeftovers removes all iptables rules and chains created by the Proxier @@ -566,24 +557,18 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { return encounteredError } -// Sync is called to immediately synchronize the proxier state to iptables +// Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { - t := time.NewTicker(proxier.syncPeriod) - defer t.Stop() // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } - for { - <-t.C - glog.V(6).Infof("Periodic sync") - proxier.Sync() - } + proxier.syncRunner.Loop(wait.NeverStop) } func (proxier *Proxier) setInitialized(value bool) { @@ -601,21 +586,21 @@ func (proxier *Proxier) isInitialized() bool { func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } @@ -624,7 +609,8 @@ func (proxier *Proxier) OnServiceSynced() { proxier.servicesSynced = true proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.mu.Unlock() - // Call it unconditionally - this is called once per lifetime. + + // Sync unconditionally - this is called once per lifetime. proxier.syncProxyRules() } @@ -662,9 +648,9 @@ func updateServiceMap( // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to serviceMap. hcServices = make(map[types.NamespacedName]uint16) - for svcPort, info := range serviceMap { + for svcPortName, info := range serviceMap { if info.healthCheckNodePort != 0 { - hcServices[svcPort.NamespacedName] = uint16(info.healthCheckNodePort) + hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) } } @@ -674,21 +660,21 @@ func updateServiceMap( func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { - proxier.syncProxyRules() + proxier.syncRunner.Run() } } @@ -697,7 +683,8 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.endpointsSynced = true proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.mu.Unlock() - // Call it unconditionally - this is called once per lifetime. + + // Sync unconditionally - this is called once per lifetime. proxier.syncProxyRules() } @@ -738,18 +725,18 @@ func updateEndpointsMap( // are modified by this function with detected stale // connections. func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) { - for svcPort, epList := range oldEndpointsMap { + for svcPortName, epList := range oldEndpointsMap { for _, ep := range epList { stale := true - for i := range newEndpointsMap[svcPort] { - if *newEndpointsMap[svcPort][i] == *ep { + for i := range newEndpointsMap[svcPortName] { + if *newEndpointsMap[svcPortName][i] == *ep { stale = false break } } if stale { - glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint) - staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true + glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint) + staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true } } } @@ -757,10 +744,10 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { localIPs := make(map[types.NamespacedName]sets.String) - for svcPort := range endpointsMap { - for _, ep := range endpointsMap[svcPort] { + for svcPortName := range endpointsMap { + for _, ep := range endpointsMap[svcPortName] { if ep.isLocal { - nsn := svcPort.NamespacedName + nsn := svcPortName.NamespacedName if localIPs[nsn] == nil { localIPs[nsn] = sets.NewString() } @@ -792,7 +779,7 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd glog.Warningf("ignoring invalid endpoint port %s", port.Name) continue } - svcPort := proxy.ServicePortName{ + svcPortName := proxy.ServicePortName{ NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: port.Name, } @@ -806,14 +793,14 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), isLocal: addr.NodeName != nil && *addr.NodeName == hostname, } - endpointsMap[svcPort] = append(endpointsMap[svcPort], epInfo) + endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo) } if glog.V(3) { newEPList := []string{} - for _, ep := range endpointsMap[svcPort] { + for _, ep := range endpointsMap[svcPortName] { newEPList = append(newEPList, ep.endpoint) } - glog.Infof("Setting endpoints for %q to %+v", svcPort, newEPList) + glog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList) } } } @@ -835,8 +822,8 @@ func serviceToServiceMap(service *api.Service) proxyServiceMap { serviceMap := make(proxyServiceMap) for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - serviceMap[serviceName] = newServiceInfo(serviceName, servicePort, service) + svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service) } return serviceMap } @@ -909,14 +896,11 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() -// assumes proxier.mu is held +// This assumes proxier.mu is NOT held func (proxier *Proxier) syncProxyRules() { proxier.mu.Lock() defer proxier.mu.Unlock() - if proxier.throttle != nil { - proxier.throttle.Accept() - } start := time.Now() defer func() { SyncProxyRulesLatency.Observe(sinceInMicroseconds(start)) @@ -928,10 +912,9 @@ func (proxier *Proxier) syncProxyRules() { return } - // We assume that if syncProxyRules was called, we really want to sync them, - // even if nothing changed in the meantime. In other words, caller are - // responsible for detecting no-op changes and not calling syncProxyRules in - // such cases. + // We assume that if this was called, we really want to sync them, + // even if nothing changed in the meantime. In other words, callers are + // responsible for detecting no-op changes and not calling this function. hcServices, staleServices := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) hcEndpoints, staleEndpoints := updateEndpointsMap( diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index a665c3c57fe..33f2386c467 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -21,6 +21,7 @@ import ( "reflect" "strconv" "testing" + "time" "github.com/davecgh/go-spew/spew" @@ -34,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" @@ -383,7 +385,7 @@ const testHostname = "test-hostname" func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. - return &Proxier{ + p := &Proxier{ exec: &exec.FakeExec{}, serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), @@ -401,6 +403,8 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), } + p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) + return p } func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool { diff --git a/pkg/util/async/BUILD b/pkg/util/async/BUILD index 0992f20245c..67ba72fe84b 100644 --- a/pkg/util/async/BUILD +++ b/pkg/util/async/BUILD @@ -10,13 +10,23 @@ load( go_library( name = "go_default_library", - srcs = ["runner.go"], + srcs = [ + "bounded_frequency_runner.go", + "runner.go", + ], tags = ["automanaged"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], ) go_test( name = "go_default_test", - srcs = ["runner_test.go"], + srcs = [ + "bounded_frequency_runner_test.go", + "runner_test.go", + ], library = ":go_default_library", tags = ["automanaged"], ) diff --git a/pkg/util/async/bounded_frequency_runner.go b/pkg/util/async/bounded_frequency_runner.go new file mode 100644 index 00000000000..531ac2cfee6 --- /dev/null +++ b/pkg/util/async/bounded_frequency_runner.go @@ -0,0 +1,229 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package async + +import ( + "fmt" + "sync" + "time" + + "k8s.io/client-go/util/flowcontrol" + + "github.com/golang/glog" +) + +// BoundedFrequencyRunner manages runs of a user-provided function. +// See NewBoundedFrequencyRunner for examples. +type BoundedFrequencyRunner struct { + name string // the name of this instance + minInterval time.Duration // the min time between runs, modulo bursts + maxInterval time.Duration // the max time between runs + + run chan struct{} // try an async run + + mu sync.Mutex // guards runs of fn and all mutations + fn func() // function to run + lastRun time.Time // time of last run + timer timer // timer for deferred runs + limiter rateLimiter // rate limiter for on-demand runs +} + +// designed so that flowcontrol.RateLimiter satisfies +type rateLimiter interface { + TryAccept() bool + Stop() +} + +type nullLimiter struct{} + +func (nullLimiter) TryAccept() bool { + return true +} + +func (nullLimiter) Stop() {} + +var _ rateLimiter = nullLimiter{} + +// for testing +type timer interface { + // C returns the timer's selectable channel. + C() <-chan time.Time + + // See time.Timer.Reset. + Reset(d time.Duration) bool + + // See time.Timer.Stop. + Stop() bool + + // See time.Now. + Now() time.Time + + // See time.Since. + Since(t time.Time) time.Duration + + // See time.Sleep. + Sleep(d time.Duration) +} + +// implement our timer in terms of std time.Timer. +type realTimer struct { + *time.Timer +} + +func (rt realTimer) C() <-chan time.Time { + return rt.Timer.C +} + +func (rt realTimer) Now() time.Time { + return time.Now() +} + +func (rt realTimer) Since(t time.Time) time.Duration { + return time.Since(t) +} + +func (rt realTimer) Sleep(d time.Duration) { + time.Sleep(d) +} + +var _ timer = realTimer{} + +// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance, +// which will manage runs of the specified function. +// +// All runs will be async to the caller of BoundedFrequencyRunner.Run, but +// multiple runs are serialized. If the function needs to hold locks, it must +// take them internally. +// +// Runs of the funtion will have at least minInterval between them (from +// completion to next start), except that up to bursts may be allowed. Burst +// runs are "accumulated" over time, one per minInterval up to burstRuns total. +// This can be used, for example, to mitigate the impact of expensive operations +// being called in response to user-initiated operations. Run requests that +// would violate the minInterval are coallesced and run at the next opportunity. +// +// The function will be run at least once per maxInterval. For example, this can +// force periodic refreshes of state in the absence of anyone calling Run. +// +// Examples: +// +// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1) +// - fn will have at least 1 second between runs +// - fn will have no more than 5 seconds between runs +// +// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3) +// - fn will have at least 3 seconds between runs, with up to 3 burst runs +// - fn will have no more than 10 seconds between runs +// +// The maxInterval must be greater than or equal to the minInterval, If the +// caller passes a maxInterval less than minInterval, this function will panic. +func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { + timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately + <-timer.C() // consume the first tick + return construct(name, fn, minInterval, maxInterval, burstRuns, timer) +} + +// Make an instance with dependencies injected. +func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner { + if maxInterval < minInterval { + panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval)) + } + if timer == nil { + panic(fmt.Sprintf("%s: timer must be non-nil", name)) + } + + bfr := &BoundedFrequencyRunner{ + name: name, + fn: fn, + minInterval: minInterval, + maxInterval: maxInterval, + run: make(chan struct{}, 16), + timer: timer, + } + if minInterval == 0 { + bfr.limiter = nullLimiter{} + } else { + // allow burst updates in short succession + qps := float32(time.Second) / float32(minInterval) + bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer) + } + return bfr +} + +// Loop handles the periodic timer and run requests. This is expected to be +// called as a goroutine. +func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { + glog.V(3).Infof("%s Loop running", bfr.name) + bfr.timer.Reset(bfr.maxInterval) + for { + select { + case <-stop: + bfr.stop() + glog.V(3).Infof("%s Loop stopping", bfr.name) + return + case <-bfr.timer.C(): + bfr.tryRun() + case <-bfr.run: + bfr.tryRun() + } + } +} + +// Run the function as soon as possible. If this is called while Loop is not +// running, the call may be deferred indefinitely. +func (bfr *BoundedFrequencyRunner) Run() { + bfr.run <- struct{}{} +} + +// assumes the lock is not held +func (bfr *BoundedFrequencyRunner) stop() { + bfr.mu.Lock() + defer bfr.mu.Unlock() + bfr.limiter.Stop() + bfr.timer.Stop() +} + +// assumes the lock is not held +func (bfr *BoundedFrequencyRunner) tryRun() { + bfr.mu.Lock() + defer bfr.mu.Unlock() + + if bfr.limiter.TryAccept() { + // We're allowed to run the function right now. + bfr.fn() + bfr.lastRun = bfr.timer.Now() + bfr.timer.Stop() + bfr.timer.Reset(bfr.maxInterval) + glog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval) + return + } + + // It can't run right now, figure out when it can run next. + + elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run + nextPossible := bfr.minInterval - elapsed // time to next possible run + nextScheduled := bfr.maxInterval - elapsed // time to next periodic run + glog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) + + if nextPossible < nextScheduled { + // Set the timer for ASAP, but don't drain here. Assuming Loop is running, + // it might get a delivery in the mean time, but that is OK. + bfr.timer.Stop() + bfr.timer.Reset(nextPossible) + glog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) + } +} diff --git a/pkg/util/async/bounded_frequency_runner_test.go b/pkg/util/async/bounded_frequency_runner_test.go new file mode 100644 index 00000000000..234ffb2a3f8 --- /dev/null +++ b/pkg/util/async/bounded_frequency_runner_test.go @@ -0,0 +1,332 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package async + +import ( + "sync" + "testing" + "time" +) + +// Track calls to the managed function. +type receiver struct { + lock sync.Mutex + run bool +} + +func (r *receiver) F() { + r.lock.Lock() + defer r.lock.Unlock() + r.run = true +} + +func (r *receiver) reset() bool { + r.lock.Lock() + defer r.lock.Unlock() + was := r.run + r.run = false + return was +} + +// A single change event in the fake timer. +type timerUpdate struct { + active bool + next time.Duration // iff active == true +} + +// Fake time. +type fakeTimer struct { + c chan time.Time + + lock sync.Mutex + now time.Time + active bool + + updated chan timerUpdate +} + +func newFakeTimer() *fakeTimer { + ft := &fakeTimer{ + c: make(chan time.Time), + updated: make(chan timerUpdate), + } + return ft +} + +func (ft *fakeTimer) C() <-chan time.Time { + return ft.c +} + +func (ft *fakeTimer) Reset(in time.Duration) bool { + ft.lock.Lock() + defer ft.lock.Unlock() + + was := ft.active + ft.active = true + ft.updated <- timerUpdate{ + active: true, + next: in, + } + return was +} + +func (ft *fakeTimer) Stop() bool { + ft.lock.Lock() + defer ft.lock.Unlock() + + was := ft.active + ft.active = false + ft.updated <- timerUpdate{ + active: false, + } + return was +} + +func (ft *fakeTimer) Now() time.Time { + ft.lock.Lock() + defer ft.lock.Unlock() + + return ft.now +} + +func (ft *fakeTimer) Since(t time.Time) time.Duration { + ft.lock.Lock() + defer ft.lock.Unlock() + + return ft.now.Sub(t) +} + +func (ft *fakeTimer) Sleep(d time.Duration) { + ft.lock.Lock() + defer ft.lock.Unlock() + + ft.advance(d) +} + +// advance the current time. +func (ft *fakeTimer) advance(d time.Duration) { + ft.lock.Lock() + defer ft.lock.Unlock() + + ft.now = ft.now.Add(d) +} + +// send a timer tick. +func (ft *fakeTimer) tick() { + ft.lock.Lock() + defer ft.lock.Unlock() + + ft.active = false + ft.c <- ft.now +} + +// return the calling line number (for printing) +// test the timer's state +func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) { + if upd.active != active { + t.Fatalf("%s: expected timer active=%v", name, active) + } + if active && upd.next != next { + t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next) + } +} + +// test and reset the receiver's state +func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) { + triggered := receiver.reset() + if expected && !triggered { + t.Fatalf("%s: function should have been called", name) + } else if !expected && triggered { + t.Fatalf("%s: function should not have been called", name) + } +} + +// Durations embedded in test cases depend on these. +var minInterval = 1 * time.Second +var maxInterval = 10 * time.Second + +func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) { + upd := <-timer.updated // wait for stop + checkReceiver(name, t, obj, expectCall) + checkReceiver(name, t, obj, false) // prove post-condition + checkTimer(name, t, upd, false, 0) + upd = <-timer.updated // wait for reset + checkTimer(name, t, upd, true, expectNext) +} + +func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) { + waitForReset(name, t, timer, obj, true, maxInterval) +} + +func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { + waitForReset(name, t, timer, obj, false, expectNext) +} + +func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { + obj := &receiver{} + timer := newFakeTimer() + runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) + stop := make(chan struct{}) + + var upd timerUpdate + + // Start. + go runner.Loop(stop) + upd = <-timer.updated // wait for initial time to be set to max + checkTimer("init", t, upd, true, maxInterval) + checkReceiver("init", t, obj, false) + + // Run once, immediately. + // rel=0ms + runner.Run() + waitForRun("first run", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(500 * time.Millisecond) // rel=500ms + runner.Run() + waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond) + + // Run again, before minInterval expires. + timer.advance(499 * time.Millisecond) // rel=999ms + runner.Run() + waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond) + + // Run again, once minInterval has passed (race with timer). + timer.advance(1 * time.Millisecond) // rel=1000ms + runner.Run() + waitForRun("second run", t, timer, obj) + + // Run again, before minInterval expires. + // rel=0ms + runner.Run() + waitForDefer("too soon after second", t, timer, obj, 1*time.Second) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // rel=1ms + runner.Run() + waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond) + + // Let the timer tick prematurely. + timer.advance(998 * time.Millisecond) // rel=999ms + timer.tick() + waitForDefer("premature tick", t, timer, obj, 1*time.Millisecond) + + // Let the timer tick. + timer.advance(1 * time.Millisecond) // rel=1000ms + timer.tick() + waitForRun("first tick", t, timer, obj) + + // Let the timer tick. + timer.advance(10 * time.Second) // rel=10000ms + timer.tick() + waitForRun("second tick", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // rel=1ms + runner.Run() + waitForDefer("too soon after tick", t, timer, obj, 999*time.Millisecond) + + // Let the timer tick. + timer.advance(999 * time.Millisecond) // rel=1000ms + timer.tick() + waitForRun("third tick", t, timer, obj) + + // Clean up. + stop <- struct{}{} +} + +func Test_BoundedFrequencyRunnerBurst(t *testing.T) { + obj := &receiver{} + timer := newFakeTimer() + runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer) + stop := make(chan struct{}) + + var upd timerUpdate + + // Start. + go runner.Loop(stop) + upd = <-timer.updated // wait for initial time to be set to max + checkTimer("init", t, upd, true, maxInterval) + checkReceiver("init", t, obj, false) + + // Run once, immediately. + // abs=0ms, rel=0ms + runner.Run() + waitForRun("first run", t, timer, obj) + + // Run again, before minInterval expires, with burst. + timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms + runner.Run() + waitForRun("second run", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms + runner.Run() + waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms + runner.Run() + waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms + runner.Run() + waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond) + + // Run again, once burst has replenished. + timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms + runner.Run() + waitForRun("third run", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms + runner.Run() + waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond) + + // Run again, before minInterval expires. + timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms + runner.Run() + waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond) + + // Run again, once burst has replenished. + timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms + runner.Run() + waitForRun("fourth run", t, timer, obj) + + // Run again, once burst has fully replenished. + timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms + runner.Run() + waitForRun("fifth run", t, timer, obj) + runner.Run() + waitForRun("sixth run", t, timer, obj) + runner.Run() + waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second) + + // Let the timer tick. + timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms + timer.tick() + waitForRun("first tick", t, timer, obj) + + // Let the timer tick. + timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms + timer.tick() + waitForRun("second tick", t, timer, obj) + + // Clean up. + stop <- struct{}{} +} diff --git a/staging/src/k8s.io/apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiserver/Godeps/Godeps.json index addbffaa66d..b47c26f6015 100644 --- a/staging/src/k8s.io/apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiserver/Godeps/Godeps.json @@ -480,7 +480,7 @@ }, { "ImportPath": "github.com/juju/ratelimit", - "Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177" + "Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342" }, { "ImportPath": "github.com/karlseguin/ccache", diff --git a/staging/src/k8s.io/client-go/Godeps/Godeps.json b/staging/src/k8s.io/client-go/Godeps/Godeps.json index 884b82a9f17..d38718f955b 100644 --- a/staging/src/k8s.io/client-go/Godeps/Godeps.json +++ b/staging/src/k8s.io/client-go/Godeps/Godeps.json @@ -160,7 +160,7 @@ }, { "ImportPath": "github.com/juju/ratelimit", - "Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177" + "Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342" }, { "ImportPath": "github.com/mailru/easyjson/buffer", diff --git a/staging/src/k8s.io/client-go/util/flowcontrol/throttle.go b/staging/src/k8s.io/client-go/util/flowcontrol/throttle.go index 881a2f57d7d..c45169c40f0 100644 --- a/staging/src/k8s.io/client-go/util/flowcontrol/throttle.go +++ b/staging/src/k8s.io/client-go/util/flowcontrol/throttle.go @@ -51,6 +51,22 @@ type tokenBucketRateLimiter struct { // The maximum number of tokens in the bucket is capped at 'burst'. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst)) + return newTokenBucketRateLimiter(limiter, qps) +} + +// An injectable, mockable clock interface. +type Clock interface { + ratelimit.Clock +} + +// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter +// but allows an injectable clock, for testing. +func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter { + limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock) + return newTokenBucketRateLimiter(limiter, qps) +} + +func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter { return &tokenBucketRateLimiter{ limiter: limiter, qps: qps, diff --git a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json index f3050e8720f..0d7740085d2 100644 --- a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json +++ b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json @@ -236,7 +236,7 @@ }, { "ImportPath": "github.com/juju/ratelimit", - "Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177" + "Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342" }, { "ImportPath": "github.com/mailru/easyjson/buffer", diff --git a/staging/src/k8s.io/kube-apiextensions-server/Godeps/Godeps.json b/staging/src/k8s.io/kube-apiextensions-server/Godeps/Godeps.json index f9cf1d0ffc0..9fc536f2bec 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/Godeps/Godeps.json +++ b/staging/src/k8s.io/kube-apiextensions-server/Godeps/Godeps.json @@ -228,7 +228,7 @@ }, { "ImportPath": "github.com/juju/ratelimit", - "Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177" + "Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342" }, { "ImportPath": "github.com/mailru/easyjson/buffer", diff --git a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json index 179f0469fd7..ddae55cc8e0 100644 --- a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json @@ -228,7 +228,7 @@ }, { "ImportPath": "github.com/juju/ratelimit", - "Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177" + "Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342" }, { "ImportPath": "github.com/mailru/easyjson/buffer", diff --git a/vendor/github.com/emicklei/go-restful-swagger12/BUILD b/vendor/github.com/emicklei/go-restful-swagger12/BUILD index bb3f2209a7e..5e7abc48671 100644 --- a/vendor/github.com/emicklei/go-restful-swagger12/BUILD +++ b/vendor/github.com/emicklei/go-restful-swagger12/BUILD @@ -37,9 +37,6 @@ filegroup( filegroup( name = "all-srcs", - srcs = [ - ":package-srcs", - "//vendor/github.com/emicklei/go-restful-swagger12/test_package:all-srcs", - ], + srcs = [":package-srcs"], tags = ["automanaged"], ) diff --git a/vendor/github.com/emicklei/go-restful-swagger12/test_package/BUILD b/vendor/github.com/emicklei/go-restful-swagger12/test_package/BUILD deleted file mode 100644 index 6cc62052baf..00000000000 --- a/vendor/github.com/emicklei/go-restful-swagger12/test_package/BUILD +++ /dev/null @@ -1,16 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/vendor/github.com/juju/ratelimit/ratelimit.go b/vendor/github.com/juju/ratelimit/ratelimit.go index 3ef32fbcc09..1c3f25b2ec4 100644 --- a/vendor/github.com/juju/ratelimit/ratelimit.go +++ b/vendor/github.com/juju/ratelimit/ratelimit.go @@ -2,7 +2,7 @@ // Licensed under the LGPLv3 with static-linking exception. // See LICENCE file for details. -// The ratelimit package provides an efficient token bucket implementation +// Package ratelimit provides an efficient token bucket implementation // that can be used to limit the rate of arbitrary things. // See http://en.wikipedia.org/wiki/Token_bucket. package ratelimit @@ -21,6 +21,7 @@ type Bucket struct { capacity int64 quantum int64 fillInterval time.Duration + clock Clock // The mutex guards the fields following it. mu sync.Mutex @@ -33,12 +34,37 @@ type Bucket struct { availTick int64 } +// Clock is used to inject testable fakes. +type Clock interface { + Now() time.Time + Sleep(d time.Duration) +} + +// realClock implements Clock in terms of standard time functions. +type realClock struct{} + +// Now is identical to time.Now. +func (realClock) Now() time.Time { + return time.Now() +} + +// Sleep is identical to time.Sleep. +func (realClock) Sleep(d time.Duration) { + time.Sleep(d) +} + // NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { - return NewBucketWithQuantum(fillInterval, capacity, 1) + return NewBucketWithClock(fillInterval, capacity, realClock{}) +} + +// NewBucketWithClock is identical to NewBucket but injects a testable clock +// interface. +func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { + return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) } // rateMargin specifes the allowed variance of actual @@ -51,12 +77,18 @@ const rateMargin = 0.01 // at high rates, the actual rate may be up to 1% different from the // specified rate. func NewBucketWithRate(rate float64, capacity int64) *Bucket { + return NewBucketWithRateAndClock(rate, capacity, realClock{}) +} + +// NewBucketWithRateAndClock is identical to NewBucketWithRate but injects a +// testable clock interface. +func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket { for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) { fillInterval := time.Duration(1e9 * float64(quantum) / rate) if fillInterval <= 0 { continue } - tb := NewBucketWithQuantum(fillInterval, capacity, quantum) + tb := NewBucketWithQuantumAndClock(fillInterval, capacity, quantum, clock) if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin { return tb } @@ -79,6 +111,12 @@ func nextQuantum(q int64) int64 { // the specification of the quantum size - quantum tokens // are added every fillInterval. func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket { + return NewBucketWithQuantumAndClock(fillInterval, capacity, quantum, realClock{}) +} + +// NewBucketWithQuantumAndClock is identical to NewBucketWithQuantum but injects +// a testable clock interface. +func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } @@ -89,7 +127,8 @@ func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) * panic("token bucket quantum is not > 0") } return &Bucket{ - startTime: time.Now(), + clock: clock, + startTime: clock.Now(), capacity: capacity, quantum: quantum, avail: capacity, @@ -101,7 +140,7 @@ func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) * // available. func (tb *Bucket) Wait(count int64) { if d := tb.Take(count); d > 0 { - time.Sleep(d) + tb.clock.Sleep(d) } } @@ -113,7 +152,7 @@ func (tb *Bucket) Wait(count int64) { func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool { d, ok := tb.TakeMaxDuration(count, maxWait) if d > 0 { - time.Sleep(d) + tb.clock.Sleep(d) } return ok } @@ -127,7 +166,7 @@ const infinityDuration time.Duration = 0x7fffffffffffffff // Note that if the request is irrevocable - there is no way to return // tokens to the bucket once this method commits us to taking them. func (tb *Bucket) Take(count int64) time.Duration { - d, _ := tb.take(time.Now(), count, infinityDuration) + d, _ := tb.take(tb.clock.Now(), count, infinityDuration) return d } @@ -141,14 +180,14 @@ func (tb *Bucket) Take(count int64) time.Duration { // wait until the tokens are actually available, and reports // true. func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) { - return tb.take(time.Now(), count, maxWait) + return tb.take(tb.clock.Now(), count, maxWait) } // TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { - return tb.takeAvailable(time.Now(), count) + return tb.takeAvailable(tb.clock.Now(), count) } // takeAvailable is the internal version of TakeAvailable - it takes the @@ -178,7 +217,7 @@ func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { // tokens could have changed in the meantime. This method is intended // primarily for metrics reporting and debugging. func (tb *Bucket) Available() int64 { - return tb.available(time.Now()) + return tb.available(tb.clock.Now()) } // available is the internal version of available - it takes the current time as