mirror of
https://github.com/kubernetes/client-go.git
synced 2026-06-24 02:34:28 +00:00
Compare commits
9 Commits
v0.18.0-al
...
v0.19.0-al
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c21f1e26fb | ||
|
|
9346d68881 | ||
|
|
94f6bd1785 | ||
|
|
a1d715839a | ||
|
|
200280e336 | ||
|
|
b0779d525a | ||
|
|
ce0298a60b | ||
|
|
03953c1a93 | ||
|
|
d6bfb32c17 |
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@@ -348,11 +348,11 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/api",
|
||||
"Rev": "v0.18.0-alpha.4"
|
||||
"Rev": "v0.19.0-alpha.0"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery",
|
||||
"Rev": "v0.18.0-alpha.4"
|
||||
"Rev": "v0.19.0-alpha.0"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/gengo",
|
||||
|
||||
8
go.mod
8
go.mod
@@ -28,8 +28,8 @@ require (
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
||||
google.golang.org/appengine v1.5.0 // indirect
|
||||
k8s.io/api v0.18.0-alpha.4
|
||||
k8s.io/apimachinery v0.18.0-alpha.4
|
||||
k8s.io/api v0.19.0-alpha.0
|
||||
k8s.io/apimachinery v0.19.0-alpha.0
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/utils v0.0.0-20200117235808-5f6fbceb4c31
|
||||
sigs.k8s.io/yaml v1.2.0
|
||||
@@ -38,6 +38,6 @@ require (
|
||||
replace (
|
||||
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
|
||||
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
|
||||
k8s.io/api => k8s.io/api v0.18.0-alpha.4
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.18.0-alpha.4
|
||||
k8s.io/api => k8s.io/api v0.19.0-alpha.0
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.19.0-alpha.0
|
||||
)
|
||||
|
||||
4
go.sum
4
go.sum
@@ -182,8 +182,8 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
k8s.io/api v0.18.0-alpha.4/go.mod h1:SvC64HywGrMEvfBPw7FcGXhoKTBfkFrpbuCoS2/gdiA=
|
||||
k8s.io/apimachinery v0.18.0-alpha.4/go.mod h1:5X8oEhnd931nEg6/Nkumo00nT6ZsCLp2h7Xwd7Ym6P4=
|
||||
k8s.io/api v0.19.0-alpha.0/go.mod h1:gwFXFTdIdFlWazixRQwbq3p9mI191fabextJ/aOC1k8=
|
||||
k8s.io/apimachinery v0.19.0-alpha.0/go.mod h1:5X8oEhnd931nEg6/Nkumo00nT6ZsCLp2h7Xwd7Ym6P4=
|
||||
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
||||
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
|
||||
25
tools/cache/mutation_detector.go
vendored
25
tools/cache/mutation_detector.go
vendored
@@ -68,7 +68,13 @@ type defaultCacheMutationDetector struct {
|
||||
name string
|
||||
period time.Duration
|
||||
|
||||
lock sync.Mutex
|
||||
// compareLock ensures only a single call to CompareObjects runs at a time
|
||||
compareObjectsLock sync.Mutex
|
||||
|
||||
// addLock guards addedObjs between AddObject and CompareObjects
|
||||
addedObjsLock sync.Mutex
|
||||
addedObjs []cacheObj
|
||||
|
||||
cachedObjs []cacheObj
|
||||
|
||||
retainDuration time.Duration
|
||||
@@ -118,15 +124,22 @@ func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
|
||||
if obj, ok := obj.(runtime.Object); ok {
|
||||
copiedObj := obj.DeepCopyObject()
|
||||
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj})
|
||||
d.addedObjsLock.Lock()
|
||||
defer d.addedObjsLock.Unlock()
|
||||
d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj})
|
||||
}
|
||||
}
|
||||
|
||||
func (d *defaultCacheMutationDetector) CompareObjects() {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
d.compareObjectsLock.Lock()
|
||||
defer d.compareObjectsLock.Unlock()
|
||||
|
||||
// move addedObjs into cachedObjs under lock
|
||||
// this keeps the critical section small to avoid blocking AddObject while we compare cachedObjs
|
||||
d.addedObjsLock.Lock()
|
||||
d.cachedObjs = append(d.cachedObjs, d.addedObjs...)
|
||||
d.addedObjs = nil
|
||||
d.addedObjsLock.Unlock()
|
||||
|
||||
altered := false
|
||||
for i, obj := range d.cachedObjs {
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package leaderelection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -32,17 +33,17 @@ type fakeLock struct {
|
||||
}
|
||||
|
||||
// Get is a dummy to allow us to have a fakeLock for testing.
|
||||
func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) {
|
||||
func (fl *fakeLock) Get(ctx context.Context) (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// Create is a dummy to allow us to have a fakeLock for testing.
|
||||
func (fl *fakeLock) Create(ler rl.LeaderElectionRecord) error {
|
||||
func (fl *fakeLock) Create(ctx context.Context, ler rl.LeaderElectionRecord) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update is a dummy to allow us to have a fakeLock for testing.
|
||||
func (fl *fakeLock) Update(ler rl.LeaderElectionRecord) error {
|
||||
func (fl *fakeLock) Update(ctx context.Context, ler rl.LeaderElectionRecord) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -241,7 +241,7 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
|
||||
desc := le.config.Lock.Describe()
|
||||
klog.Infof("attempting to acquire leader lease %v...", desc)
|
||||
wait.JitterUntil(func() {
|
||||
succeeded = le.tryAcquireOrRenew()
|
||||
succeeded = le.tryAcquireOrRenew(ctx)
|
||||
le.maybeReportTransition()
|
||||
if !succeeded {
|
||||
klog.V(4).Infof("failed to acquire lease %v", desc)
|
||||
@@ -263,18 +263,7 @@ func (le *LeaderElector) renew(ctx context.Context) {
|
||||
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
|
||||
defer timeoutCancel()
|
||||
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
|
||||
done := make(chan bool, 1)
|
||||
go func() {
|
||||
defer close(done)
|
||||
done <- le.tryAcquireOrRenew()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-timeoutCtx.Done():
|
||||
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
|
||||
case result := <-done:
|
||||
return result, nil
|
||||
}
|
||||
return le.tryAcquireOrRenew(timeoutCtx), nil
|
||||
}, timeoutCtx.Done())
|
||||
|
||||
le.maybeReportTransition()
|
||||
@@ -303,7 +292,7 @@ func (le *LeaderElector) release() bool {
|
||||
leaderElectionRecord := rl.LeaderElectionRecord{
|
||||
LeaderTransitions: le.observedRecord.LeaderTransitions,
|
||||
}
|
||||
if err := le.config.Lock.Update(leaderElectionRecord); err != nil {
|
||||
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
|
||||
klog.Errorf("Failed to release lock: %v", err)
|
||||
return false
|
||||
}
|
||||
@@ -315,7 +304,7 @@ func (le *LeaderElector) release() bool {
|
||||
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
|
||||
// else it tries to renew the lease if it has already been acquired. Returns true
|
||||
// on success else returns false.
|
||||
func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
|
||||
now := metav1.Now()
|
||||
leaderElectionRecord := rl.LeaderElectionRecord{
|
||||
HolderIdentity: le.config.Lock.Identity(),
|
||||
@@ -325,13 +314,13 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||
}
|
||||
|
||||
// 1. obtain or create the ElectionRecord
|
||||
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
|
||||
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
|
||||
return false
|
||||
}
|
||||
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
|
||||
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
|
||||
klog.Errorf("error initially creating leader election record: %v", err)
|
||||
return false
|
||||
}
|
||||
@@ -363,7 +352,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||
}
|
||||
|
||||
// update the lock itself
|
||||
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
|
||||
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
|
||||
klog.Errorf("Failed to update lock: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package leaderelection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -296,7 +297,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
|
||||
observedTime: test.observedTime,
|
||||
clock: clock.RealClock{},
|
||||
}
|
||||
if test.expectSuccess != le.tryAcquireOrRenew() {
|
||||
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
|
||||
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
|
||||
}
|
||||
|
||||
@@ -879,7 +880,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
|
||||
observedTime: test.observedTime,
|
||||
clock: clock.RealClock{},
|
||||
}
|
||||
if test.expectSuccess != le.tryAcquireOrRenew() {
|
||||
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
|
||||
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
|
||||
}
|
||||
|
||||
|
||||
@@ -42,10 +42,10 @@ type ConfigMapLock struct {
|
||||
}
|
||||
|
||||
// Get returns the election record from a ConfigMap Annotation
|
||||
func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
|
||||
func (cml *ConfigMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
|
||||
var record LeaderElectionRecord
|
||||
var err error
|
||||
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(context.TODO(), cml.ConfigMapMeta.Name, metav1.GetOptions{})
|
||||
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -62,12 +62,12 @@ func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
|
||||
}
|
||||
|
||||
// Create attempts to create a LeaderElectionRecord annotation
|
||||
func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error {
|
||||
func (cml *ConfigMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
|
||||
recordBytes, err := json.Marshal(ler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(context.TODO(), &v1.ConfigMap{
|
||||
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: cml.ConfigMapMeta.Name,
|
||||
Namespace: cml.ConfigMapMeta.Namespace,
|
||||
@@ -80,7 +80,7 @@ func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error {
|
||||
}
|
||||
|
||||
// Update will update an existing annotation on a given resource.
|
||||
func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
|
||||
func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
|
||||
if cml.cm == nil {
|
||||
return errors.New("configmap not initialized, call get or create first")
|
||||
}
|
||||
@@ -89,7 +89,7 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
|
||||
return err
|
||||
}
|
||||
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
|
||||
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(context.TODO(), cml.cm, metav1.UpdateOptions{})
|
||||
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -37,10 +37,10 @@ type EndpointsLock struct {
|
||||
}
|
||||
|
||||
// Get returns the election record from a Endpoints Annotation
|
||||
func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
|
||||
func (el *EndpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
|
||||
var record LeaderElectionRecord
|
||||
var err error
|
||||
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(context.TODO(), el.EndpointsMeta.Name, metav1.GetOptions{})
|
||||
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -57,12 +57,12 @@ func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
|
||||
}
|
||||
|
||||
// Create attempts to create a LeaderElectionRecord annotation
|
||||
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
|
||||
func (el *EndpointsLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
|
||||
recordBytes, err := json.Marshal(ler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(context.TODO(), &v1.Endpoints{
|
||||
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(ctx, &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: el.EndpointsMeta.Name,
|
||||
Namespace: el.EndpointsMeta.Namespace,
|
||||
@@ -75,7 +75,7 @@ func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
|
||||
}
|
||||
|
||||
// Update will update and existing annotation on a given resource.
|
||||
func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
|
||||
func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
|
||||
if el.e == nil {
|
||||
return errors.New("endpoint not initialized, call get or create first")
|
||||
}
|
||||
@@ -87,7 +87,7 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
|
||||
el.e.Annotations = make(map[string]string)
|
||||
}
|
||||
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
|
||||
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(context.TODO(), el.e, metav1.UpdateOptions{})
|
||||
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package resourcelock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -73,13 +74,13 @@ type ResourceLockConfig struct {
|
||||
// by the leaderelection code.
|
||||
type Interface interface {
|
||||
// Get returns the LeaderElectionRecord
|
||||
Get() (*LeaderElectionRecord, []byte, error)
|
||||
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
|
||||
|
||||
// Create attempts to create a LeaderElectionRecord
|
||||
Create(ler LeaderElectionRecord) error
|
||||
Create(ctx context.Context, ler LeaderElectionRecord) error
|
||||
|
||||
// Update will update and existing LeaderElectionRecord
|
||||
Update(ler LeaderElectionRecord) error
|
||||
Update(ctx context.Context, ler LeaderElectionRecord) error
|
||||
|
||||
// RecordEvent is used to record events
|
||||
RecordEvent(string)
|
||||
|
||||
@@ -38,9 +38,9 @@ type LeaseLock struct {
|
||||
}
|
||||
|
||||
// Get returns the election record from a Lease spec
|
||||
func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
|
||||
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
|
||||
var err error
|
||||
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(context.TODO(), ll.LeaseMeta.Name, metav1.GetOptions{})
|
||||
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -53,9 +53,9 @@ func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
|
||||
}
|
||||
|
||||
// Create attempts to create a Lease
|
||||
func (ll *LeaseLock) Create(ler LeaderElectionRecord) error {
|
||||
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
|
||||
var err error
|
||||
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(context.TODO(), &coordinationv1.Lease{
|
||||
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: ll.LeaseMeta.Name,
|
||||
Namespace: ll.LeaseMeta.Namespace,
|
||||
@@ -66,13 +66,13 @@ func (ll *LeaseLock) Create(ler LeaderElectionRecord) error {
|
||||
}
|
||||
|
||||
// Update will update an existing Lease spec.
|
||||
func (ll *LeaseLock) Update(ler LeaderElectionRecord) error {
|
||||
func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
|
||||
if ll.lease == nil {
|
||||
return errors.New("lease not initialized, call get or create first")
|
||||
}
|
||||
ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
|
||||
var err error
|
||||
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(context.TODO(), ll.lease, metav1.UpdateOptions{})
|
||||
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ package resourcelock
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -34,13 +35,13 @@ type MultiLock struct {
|
||||
}
|
||||
|
||||
// Get returns the older election record of the lock
|
||||
func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
|
||||
primary, primaryRaw, err := ml.Primary.Get()
|
||||
func (ml *MultiLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
|
||||
primary, primaryRaw, err := ml.Primary.Get(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
secondary, secondaryRaw, err := ml.Secondary.Get()
|
||||
secondary, secondaryRaw, err := ml.Secondary.Get(ctx)
|
||||
if err != nil {
|
||||
// Lock is held by old client
|
||||
if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() {
|
||||
@@ -60,25 +61,25 @@ func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
|
||||
}
|
||||
|
||||
// Create attempts to create both primary lock and secondary lock
|
||||
func (ml *MultiLock) Create(ler LeaderElectionRecord) error {
|
||||
err := ml.Primary.Create(ler)
|
||||
func (ml *MultiLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
|
||||
err := ml.Primary.Create(ctx, ler)
|
||||
if err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
return ml.Secondary.Create(ler)
|
||||
return ml.Secondary.Create(ctx, ler)
|
||||
}
|
||||
|
||||
// Update will update and existing annotation on both two resources.
|
||||
func (ml *MultiLock) Update(ler LeaderElectionRecord) error {
|
||||
err := ml.Primary.Update(ler)
|
||||
func (ml *MultiLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
|
||||
err := ml.Primary.Update(ctx, ler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, _, err = ml.Secondary.Get()
|
||||
_, _, err = ml.Secondary.Get(ctx)
|
||||
if err != nil && apierrors.IsNotFound(err) {
|
||||
return ml.Secondary.Create(ler)
|
||||
return ml.Secondary.Create(ctx, ler)
|
||||
}
|
||||
return ml.Secondary.Update(ler)
|
||||
return ml.Secondary.Update(ctx, ler)
|
||||
}
|
||||
|
||||
// RecordEvent in leader election while adding meta-data
|
||||
|
||||
@@ -77,11 +77,6 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
|
||||
|
||||
closable := &closableConn{Conn: conn}
|
||||
|
||||
// Start tracking the connection
|
||||
d.mu.Lock()
|
||||
d.conns[closable] = struct{}{}
|
||||
d.mu.Unlock()
|
||||
|
||||
// When the connection is closed, remove it from the map. This will
|
||||
// be no-op if the connection isn't in the map, e.g. if CloseAll()
|
||||
// is called.
|
||||
@@ -91,6 +86,11 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
// Start tracking the connection
|
||||
d.mu.Lock()
|
||||
d.conns[closable] = struct{}{}
|
||||
d.mu.Unlock()
|
||||
|
||||
return closable, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,8 @@ package connrotation
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -50,6 +52,73 @@ func TestCloseAll(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCloseAllRace ensures CloseAll works with connections being simultaneously dialed
|
||||
func TestCloseAllRace(t *testing.T) {
|
||||
conns := int64(0)
|
||||
dialer := NewDialer(func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return closeOnlyConn{onClose: func() { atomic.AddInt64(&conns, -1) }}, nil
|
||||
})
|
||||
|
||||
done := make(chan struct{})
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
// Close all as fast as we can
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
dialer.CloseAll()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Dial as fast as we can
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
if _, err := dialer.Dial("", ""); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
atomic.AddInt64(&conns, 1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Soak to ensure no races
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Signal completion
|
||||
close(done)
|
||||
// Wait for goroutines
|
||||
wg.Wait()
|
||||
// Ensure CloseAll ran after all dials
|
||||
dialer.CloseAll()
|
||||
|
||||
// Expect all connections to close within 5 seconds
|
||||
for start := time.Now(); time.Now().Sub(start) < 5*time.Second; time.Sleep(10 * time.Millisecond) {
|
||||
// Ensure all connections were closed
|
||||
if c := atomic.LoadInt64(&conns); c == 0 {
|
||||
break
|
||||
} else {
|
||||
t.Logf("got %d open connections, want 0, will retry", c)
|
||||
}
|
||||
}
|
||||
// Ensure all connections were closed
|
||||
if c := atomic.LoadInt64(&conns); c != 0 {
|
||||
t.Fatalf("got %d open connections, want 0", c)
|
||||
}
|
||||
}
|
||||
|
||||
type closeOnlyConn struct {
|
||||
net.Conn
|
||||
onClose func()
|
||||
|
||||
@@ -62,6 +62,54 @@ func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
|
||||
func (r *BucketRateLimiter) Forget(item interface{}) {
|
||||
}
|
||||
|
||||
// ItemBucketRateLimiter implements a workqueue ratelimiter API using standard rate.Limiter.
|
||||
// Each key is using a separate limiter.
|
||||
type ItemBucketRateLimiter struct {
|
||||
r rate.Limit
|
||||
burst int
|
||||
|
||||
limitersLock sync.Mutex
|
||||
limiters map[interface{}]*rate.Limiter
|
||||
}
|
||||
|
||||
var _ RateLimiter = &ItemBucketRateLimiter{}
|
||||
|
||||
// NewItemBucketRateLimiter creates new ItemBucketRateLimiter instance.
|
||||
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
|
||||
return &ItemBucketRateLimiter{
|
||||
r: r,
|
||||
burst: burst,
|
||||
limiters: make(map[interface{}]*rate.Limiter),
|
||||
}
|
||||
}
|
||||
|
||||
// When returns a time.Duration which we need to wait before item is processed.
|
||||
func (r *ItemBucketRateLimiter) When(item interface{}) time.Duration {
|
||||
r.limitersLock.Lock()
|
||||
defer r.limitersLock.Unlock()
|
||||
|
||||
limiter, ok := r.limiters[item]
|
||||
if !ok {
|
||||
limiter = rate.NewLimiter(r.r, r.burst)
|
||||
r.limiters[item] = limiter
|
||||
}
|
||||
|
||||
return limiter.Reserve().Delay()
|
||||
}
|
||||
|
||||
// NumRequeues returns always 0 (doesn't apply to ItemBucketRateLimiter).
|
||||
func (r *ItemBucketRateLimiter) NumRequeues(item interface{}) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Forget removes item from the internal state.
|
||||
func (r *ItemBucketRateLimiter) Forget(item interface{}) {
|
||||
r.limitersLock.Lock()
|
||||
defer r.limitersLock.Unlock()
|
||||
|
||||
delete(r.limiters, item)
|
||||
}
|
||||
|
||||
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
|
||||
// dealing with max failures and expiration are up to the caller
|
||||
type ItemExponentialFailureRateLimiter struct {
|
||||
|
||||
@@ -19,6 +19,8 @@ package workqueue
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func TestItemExponentialFailureRateLimiter(t *testing.T) {
|
||||
@@ -96,6 +98,33 @@ func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestItemBucketRateLimiter(t *testing.T) {
|
||||
limiter := NewItemBucketRateLimiter(rate.Every(100*time.Millisecond), 1)
|
||||
|
||||
// Use initial burst.
|
||||
if got := limiter.When("one"); got != 0 {
|
||||
t.Errorf("limiter.When(two) = %v; want 0", got)
|
||||
}
|
||||
for i := 0; i < 1000; i++ {
|
||||
limiter.When("one")
|
||||
}
|
||||
// limiter.When should be at this point = 1000 * rate.Limit.
|
||||
// We set the threshold 1s below this value to avoid race conditions.
|
||||
if got, want := limiter.When("one"), 990*100*time.Millisecond; got < want {
|
||||
t.Errorf("limiter.When(one) = %v; want at least %v", got, want)
|
||||
}
|
||||
|
||||
if got := limiter.When("two"); got != 0 {
|
||||
t.Errorf("limiter.When(two) = %v; want 0", got)
|
||||
}
|
||||
|
||||
limiter.Forget("one")
|
||||
// Use new budget.
|
||||
if got := limiter.When("one"); got != 0 {
|
||||
t.Errorf("limiter.When(two) = %v; want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemFastSlowRateLimiter(t *testing.T) {
|
||||
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user