Compare commits

..

9 Commits

Author SHA1 Message Date
Kubernetes Publisher
c21f1e26fb Update dependencies to v0.19.0-alpha.0 tag 2020-02-21 16:31:43 +00:00
Kubernetes Publisher
9346d68881 Merge pull request #88261 from mborsz/rate
Implement ItemBucketRateLimiter

Kubernetes-commit: 1e12d92a5179dbfeb455c79dbf9120c8536e5f9c
2020-02-18 19:23:22 +00:00
Maciej Borsz
94f6bd1785 Implement ItemBucketRateLimiter
Kubernetes-commit: 6846a0a6b62725a5888d66e2789f2af4fb172045
2020-02-18 11:24:20 +01:00
Kubernetes Publisher
a1d715839a Merge pull request #87899 from tedyu/observed-rec
Pass context to tryAcquireOrRenew

Kubernetes-commit: 1c225e3e5648d61f1dfc4f04db99712ab13ba86a
2020-02-16 05:46:14 +00:00
Kubernetes Publisher
200280e336 Merge pull request #88131 from liggitt/mutation-detector-trace
Shrink mutation detection critical section

Kubernetes-commit: c1aeef5e6279f3c3325933242de5746e72635a31
2020-02-16 05:46:13 +00:00
Jordan Liggitt
b0779d525a Shrink mutation detection critical section
Kubernetes-commit: 12abf03f6b4a60fa61773acd21dfb440ff10f699
2020-02-13 22:22:14 -05:00
Kubernetes Publisher
ce0298a60b Merge pull request #88079 from liggitt/onclose-panic
Set up connection onClose prior to adding to connection map

Kubernetes-commit: b2bf4a8acb7ac48f5428483237fff8fb722802bf
2020-02-14 08:23:08 +00:00
Jordan Liggitt
03953c1a93 Set up connection onClose prior to adding to connection map
Kubernetes-commit: aa4113d777dd6c699233e0b6d903e9734e182686
2020-02-12 11:14:22 -05:00
Ted Yu
d6bfb32c17 Pass context to tryAcquireOrRenew
Kubernetes-commit: c049f30ef2a83172f46a587ddaf2104b39df8301
2020-02-10 12:31:24 -08:00
16 changed files with 226 additions and 74 deletions

4
Godeps/Godeps.json generated
View File

@@ -348,11 +348,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "v0.18.0-alpha.4"
"Rev": "v0.19.0-alpha.0"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "v0.18.0-alpha.4"
"Rev": "v0.19.0-alpha.0"
},
{
"ImportPath": "k8s.io/gengo",

8
go.mod
View File

@@ -28,8 +28,8 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.18.0-alpha.4
k8s.io/apimachinery v0.18.0-alpha.4
k8s.io/api v0.19.0-alpha.0
k8s.io/apimachinery v0.19.0-alpha.0
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20200117235808-5f6fbceb4c31
sigs.k8s.io/yaml v1.2.0
@@ -38,6 +38,6 @@ require (
replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
k8s.io/api => k8s.io/api v0.18.0-alpha.4
k8s.io/apimachinery => k8s.io/apimachinery v0.18.0-alpha.4
k8s.io/api => k8s.io/api v0.19.0-alpha.0
k8s.io/apimachinery => k8s.io/apimachinery v0.19.0-alpha.0
)

4
go.sum
View File

@@ -182,8 +182,8 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.18.0-alpha.4/go.mod h1:SvC64HywGrMEvfBPw7FcGXhoKTBfkFrpbuCoS2/gdiA=
k8s.io/apimachinery v0.18.0-alpha.4/go.mod h1:5X8oEhnd931nEg6/Nkumo00nT6ZsCLp2h7Xwd7Ym6P4=
k8s.io/api v0.19.0-alpha.0/go.mod h1:gwFXFTdIdFlWazixRQwbq3p9mI191fabextJ/aOC1k8=
k8s.io/apimachinery v0.19.0-alpha.0/go.mod h1:5X8oEhnd931nEg6/Nkumo00nT6ZsCLp2h7Xwd7Ym6P4=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=

View File

@@ -68,7 +68,13 @@ type defaultCacheMutationDetector struct {
name string
period time.Duration
lock sync.Mutex
// compareLock ensures only a single call to CompareObjects runs at a time
compareObjectsLock sync.Mutex
// addLock guards addedObjs between AddObject and CompareObjects
addedObjsLock sync.Mutex
addedObjs []cacheObj
cachedObjs []cacheObj
retainDuration time.Duration
@@ -118,15 +124,22 @@ func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
if obj, ok := obj.(runtime.Object); ok {
copiedObj := obj.DeepCopyObject()
d.lock.Lock()
defer d.lock.Unlock()
d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj})
d.addedObjsLock.Lock()
defer d.addedObjsLock.Unlock()
d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj})
}
}
func (d *defaultCacheMutationDetector) CompareObjects() {
d.lock.Lock()
defer d.lock.Unlock()
d.compareObjectsLock.Lock()
defer d.compareObjectsLock.Unlock()
// move addedObjs into cachedObjs under lock
// this keeps the critical section small to avoid blocking AddObject while we compare cachedObjs
d.addedObjsLock.Lock()
d.cachedObjs = append(d.cachedObjs, d.addedObjs...)
d.addedObjs = nil
d.addedObjsLock.Unlock()
altered := false
for i, obj := range d.cachedObjs {

View File

@@ -17,6 +17,7 @@ limitations under the License.
package leaderelection
import (
"context"
"fmt"
"testing"
"time"
@@ -32,17 +33,17 @@ type fakeLock struct {
}
// Get is a dummy to allow us to have a fakeLock for testing.
func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) {
func (fl *fakeLock) Get(ctx context.Context) (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) {
return nil, nil, nil
}
// Create is a dummy to allow us to have a fakeLock for testing.
func (fl *fakeLock) Create(ler rl.LeaderElectionRecord) error {
func (fl *fakeLock) Create(ctx context.Context, ler rl.LeaderElectionRecord) error {
return nil
}
// Update is a dummy to allow us to have a fakeLock for testing.
func (fl *fakeLock) Update(ler rl.LeaderElectionRecord) error {
func (fl *fakeLock) Update(ctx context.Context, ler rl.LeaderElectionRecord) error {
return nil
}

View File

@@ -241,7 +241,7 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew()
succeeded = le.tryAcquireOrRenew(ctx)
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
@@ -263,18 +263,7 @@ func (le *LeaderElector) renew(ctx context.Context) {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
done := make(chan bool, 1)
go func() {
defer close(done)
done <- le.tryAcquireOrRenew()
}()
select {
case <-timeoutCtx.Done():
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
case result := <-done:
return result, nil
}
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())
le.maybeReportTransition()
@@ -303,7 +292,7 @@ func (le *LeaderElector) release() bool {
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
}
if err := le.config.Lock.Update(leaderElectionRecord); err != nil {
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
return false
}
@@ -315,7 +304,7 @@ func (le *LeaderElector) release() bool {
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew() bool {
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
@@ -325,13 +314,13 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
// 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
@@ -363,7 +352,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
// update the lock itself
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package leaderelection
import (
"context"
"encoding/json"
"fmt"
"sync"
@@ -296,7 +297,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew() {
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
@@ -879,7 +880,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew() {
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}

View File

@@ -42,10 +42,10 @@ type ConfigMapLock struct {
}
// Get returns the election record from a ConfigMap Annotation
func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
func (cml *ConfigMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord
var err error
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(context.TODO(), cml.ConfigMapMeta.Name, metav1.GetOptions{})
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@@ -62,12 +62,12 @@ func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a LeaderElectionRecord annotation
func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error {
func (cml *ConfigMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(context.TODO(), &v1.ConfigMap{
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cml.ConfigMapMeta.Name,
Namespace: cml.ConfigMapMeta.Namespace,
@@ -80,7 +80,7 @@ func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error {
}
// Update will update an existing annotation on a given resource.
func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if cml.cm == nil {
return errors.New("configmap not initialized, call get or create first")
}
@@ -89,7 +89,7 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
return err
}
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(context.TODO(), cml.cm, metav1.UpdateOptions{})
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
return err
}

View File

@@ -37,10 +37,10 @@ type EndpointsLock struct {
}
// Get returns the election record from a Endpoints Annotation
func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
func (el *EndpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord
var err error
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(context.TODO(), el.EndpointsMeta.Name, metav1.GetOptions{})
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@@ -57,12 +57,12 @@ func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a LeaderElectionRecord annotation
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
func (el *EndpointsLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(context.TODO(), &v1.Endpoints{
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(ctx, &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: el.EndpointsMeta.Name,
Namespace: el.EndpointsMeta.Namespace,
@@ -75,7 +75,7 @@ func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
}
// Update will update and existing annotation on a given resource.
func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if el.e == nil {
return errors.New("endpoint not initialized, call get or create first")
}
@@ -87,7 +87,7 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
el.e.Annotations = make(map[string]string)
}
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(context.TODO(), el.e, metav1.UpdateOptions{})
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
return err
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package resourcelock
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -73,13 +74,13 @@ type ResourceLockConfig struct {
// by the leaderelection code.
type Interface interface {
// Get returns the LeaderElectionRecord
Get() (*LeaderElectionRecord, []byte, error)
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
// Create attempts to create a LeaderElectionRecord
Create(ler LeaderElectionRecord) error
Create(ctx context.Context, ler LeaderElectionRecord) error
// Update will update and existing LeaderElectionRecord
Update(ler LeaderElectionRecord) error
Update(ctx context.Context, ler LeaderElectionRecord) error
// RecordEvent is used to record events
RecordEvent(string)

View File

@@ -38,9 +38,9 @@ type LeaseLock struct {
}
// Get returns the election record from a Lease spec
func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(context.TODO(), ll.LeaseMeta.Name, metav1.GetOptions{})
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@@ -53,9 +53,9 @@ func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a Lease
func (ll *LeaseLock) Create(ler LeaderElectionRecord) error {
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(context.TODO(), &coordinationv1.Lease{
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: ll.LeaseMeta.Name,
Namespace: ll.LeaseMeta.Namespace,
@@ -66,13 +66,13 @@ func (ll *LeaseLock) Create(ler LeaderElectionRecord) error {
}
// Update will update an existing Lease spec.
func (ll *LeaseLock) Update(ler LeaderElectionRecord) error {
func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if ll.lease == nil {
return errors.New("lease not initialized, call get or create first")
}
ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(context.TODO(), ll.lease, metav1.UpdateOptions{})
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
return err
}

View File

@@ -18,6 +18,7 @@ package resourcelock
import (
"bytes"
"context"
"encoding/json"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -34,13 +35,13 @@ type MultiLock struct {
}
// Get returns the older election record of the lock
func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
primary, primaryRaw, err := ml.Primary.Get()
func (ml *MultiLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
primary, primaryRaw, err := ml.Primary.Get(ctx)
if err != nil {
return nil, nil, err
}
secondary, secondaryRaw, err := ml.Secondary.Get()
secondary, secondaryRaw, err := ml.Secondary.Get(ctx)
if err != nil {
// Lock is held by old client
if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() {
@@ -60,25 +61,25 @@ func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create both primary lock and secondary lock
func (ml *MultiLock) Create(ler LeaderElectionRecord) error {
err := ml.Primary.Create(ler)
func (ml *MultiLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
err := ml.Primary.Create(ctx, ler)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return ml.Secondary.Create(ler)
return ml.Secondary.Create(ctx, ler)
}
// Update will update and existing annotation on both two resources.
func (ml *MultiLock) Update(ler LeaderElectionRecord) error {
err := ml.Primary.Update(ler)
func (ml *MultiLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
err := ml.Primary.Update(ctx, ler)
if err != nil {
return err
}
_, _, err = ml.Secondary.Get()
_, _, err = ml.Secondary.Get(ctx)
if err != nil && apierrors.IsNotFound(err) {
return ml.Secondary.Create(ler)
return ml.Secondary.Create(ctx, ler)
}
return ml.Secondary.Update(ler)
return ml.Secondary.Update(ctx, ler)
}
// RecordEvent in leader election while adding meta-data

View File

@@ -77,11 +77,6 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
closable := &closableConn{Conn: conn}
// Start tracking the connection
d.mu.Lock()
d.conns[closable] = struct{}{}
d.mu.Unlock()
// When the connection is closed, remove it from the map. This will
// be no-op if the connection isn't in the map, e.g. if CloseAll()
// is called.
@@ -91,6 +86,11 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
d.mu.Unlock()
}
// Start tracking the connection
d.mu.Lock()
d.conns[closable] = struct{}{}
d.mu.Unlock()
return closable, nil
}

View File

@@ -19,6 +19,8 @@ package connrotation
import (
"context"
"net"
"sync"
"sync/atomic"
"testing"
"time"
)
@@ -50,6 +52,73 @@ func TestCloseAll(t *testing.T) {
}
}
// TestCloseAllRace ensures CloseAll works with connections being simultaneously dialed
func TestCloseAllRace(t *testing.T) {
conns := int64(0)
dialer := NewDialer(func(ctx context.Context, network, address string) (net.Conn, error) {
return closeOnlyConn{onClose: func() { atomic.AddInt64(&conns, -1) }}, nil
})
done := make(chan struct{})
wg := &sync.WaitGroup{}
// Close all as fast as we can
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
dialer.CloseAll()
}
}
}()
// Dial as fast as we can
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
if _, err := dialer.Dial("", ""); err != nil {
t.Error(err)
return
}
atomic.AddInt64(&conns, 1)
}
}
}()
// Soak to ensure no races
time.Sleep(time.Second)
// Signal completion
close(done)
// Wait for goroutines
wg.Wait()
// Ensure CloseAll ran after all dials
dialer.CloseAll()
// Expect all connections to close within 5 seconds
for start := time.Now(); time.Now().Sub(start) < 5*time.Second; time.Sleep(10 * time.Millisecond) {
// Ensure all connections were closed
if c := atomic.LoadInt64(&conns); c == 0 {
break
} else {
t.Logf("got %d open connections, want 0, will retry", c)
}
}
// Ensure all connections were closed
if c := atomic.LoadInt64(&conns); c != 0 {
t.Fatalf("got %d open connections, want 0", c)
}
}
type closeOnlyConn struct {
net.Conn
onClose func()

View File

@@ -62,6 +62,54 @@ func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
func (r *BucketRateLimiter) Forget(item interface{}) {
}
// ItemBucketRateLimiter implements a workqueue ratelimiter API using standard rate.Limiter.
// Each key is using a separate limiter.
type ItemBucketRateLimiter struct {
r rate.Limit
burst int
limitersLock sync.Mutex
limiters map[interface{}]*rate.Limiter
}
var _ RateLimiter = &ItemBucketRateLimiter{}
// NewItemBucketRateLimiter creates new ItemBucketRateLimiter instance.
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
return &ItemBucketRateLimiter{
r: r,
burst: burst,
limiters: make(map[interface{}]*rate.Limiter),
}
}
// When returns a time.Duration which we need to wait before item is processed.
func (r *ItemBucketRateLimiter) When(item interface{}) time.Duration {
r.limitersLock.Lock()
defer r.limitersLock.Unlock()
limiter, ok := r.limiters[item]
if !ok {
limiter = rate.NewLimiter(r.r, r.burst)
r.limiters[item] = limiter
}
return limiter.Reserve().Delay()
}
// NumRequeues returns always 0 (doesn't apply to ItemBucketRateLimiter).
func (r *ItemBucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}
// Forget removes item from the internal state.
func (r *ItemBucketRateLimiter) Forget(item interface{}) {
r.limitersLock.Lock()
defer r.limitersLock.Unlock()
delete(r.limiters, item)
}
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {

View File

@@ -19,6 +19,8 @@ package workqueue
import (
"testing"
"time"
"golang.org/x/time/rate"
)
func TestItemExponentialFailureRateLimiter(t *testing.T) {
@@ -96,6 +98,33 @@ func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) {
}
func TestItemBucketRateLimiter(t *testing.T) {
limiter := NewItemBucketRateLimiter(rate.Every(100*time.Millisecond), 1)
// Use initial burst.
if got := limiter.When("one"); got != 0 {
t.Errorf("limiter.When(two) = %v; want 0", got)
}
for i := 0; i < 1000; i++ {
limiter.When("one")
}
// limiter.When should be at this point = 1000 * rate.Limit.
// We set the threshold 1s below this value to avoid race conditions.
if got, want := limiter.When("one"), 990*100*time.Millisecond; got < want {
t.Errorf("limiter.When(one) = %v; want at least %v", got, want)
}
if got := limiter.When("two"); got != 0 {
t.Errorf("limiter.When(two) = %v; want 0", got)
}
limiter.Forget("one")
// Use new budget.
if got := limiter.When("one"); got != 0 {
t.Errorf("limiter.When(two) = %v; want 0", got)
}
}
func TestItemFastSlowRateLimiter(t *testing.T) {
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)