Compare commits

...

10 Commits

Author SHA1 Message Date
Kubernetes Publisher
f0ffbb3aea Update dependencies to v0.18.13 tag 2020-12-09 21:14:57 +00:00
Kubernetes Publisher
3214b668d1 Merge pull request #95964 from dprotaso/automated-cherry-pick-of-#95939-upstream-release-1.18
Automated cherry pick of #95939: Address scenario where releasing a resource lock fails if a prior update fails or gets cancelled

Kubernetes-commit: f9925902620ae4c375b3f16e58794ae6a96d4cb6
2020-11-06 00:34:21 +00:00
Dave Protasowski
36ad760117 Re-add the event recorder in the release test
Prior having a mock recorder would cause panics since the lock
would be set to nil on update failures. Now the recorder will
use the cached lock

Kubernetes-commit: b7e3af7579fb9f79d3304ba78f73f80efde805ae
2020-10-27 22:45:33 -04:00
Dave Protasowski
89beb5b8df Don't clear the cached resourcelock when errors occurs on updates
This allows the lock to be release normally - even with a
potentially stale lock. This flow should only occur when we're
the lease holders.

Kubernetes-commit: 04170f66529f98367bc0d91f9419962948f95188
2020-10-27 22:41:39 -04:00
Dave Protasowski
2f053ea2ff Add failing test showing release is not working properly
Kubernetes-commit: 9988d73c9157cd5bb14dfaf0b8411d6c845df153
2020-10-27 22:29:22 -04:00
Kubernetes Publisher
c301108be7 Merge pull request #95927 from dprotaso/automated-cherry-pick-of-#80954-upstream-release-1.18
Automated cherry pick of #80954: Generate complete leader election record to resolve

Kubernetes-commit: 3d7136e9c51f31df37e2f5151c226941be67f4ba
2020-10-28 08:21:57 -07:00
Zachary Seguin
5053776c1e Add lease release tests in leader election
Kubernetes-commit: 9030fc47a24f630770b6966bc2b08153c57b4cc8
2020-04-22 18:52:44 -04:00
Zachary Seguin
228a223128 Generate complete leader election record to resolve leader election issues with LeaseLocks
Kubernetes-commit: dfa528ed0bd94cd603366d7a40fc880d4ab1af63
2019-08-03 17:40:03 -04:00
Kubernetes Publisher
0716e817fc Merge pull request #95619 from roycaihw/automated-cherry-pick-of-#95427-upstream-release-1.18
Automated cherry pick of #95427: don't cache transports for incomparable configs

Kubernetes-commit: 0036586379f5b9958017200798c4bb7ae7d85519
2020-10-16 05:41:25 -07:00
Haowei Cai
7e1165d068 don't cache transports for incomparable configs
Co-authored-by: Jordan Liggitt <liggitt@google.com>

Kubernetes-commit: 20b77693a2d12f7f8e75c66b7a751fd0613ca465
2020-10-07 15:44:27 -07:00
10 changed files with 361 additions and 41 deletions

4
Godeps/Godeps.json generated
View File

@@ -352,11 +352,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "4352ece35dc9"
"Rev": "v0.18.13"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "1a4218b8b919"
"Rev": "v0.18.13"
},
{
"ImportPath": "k8s.io/gengo",

8
go.mod
View File

@@ -28,8 +28,8 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.0.0-20200903132205-4352ece35dc9
k8s.io/apimachinery v0.0.0-20200903131814-1a4218b8b919
k8s.io/api v0.18.13
k8s.io/apimachinery v0.18.13
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
sigs.k8s.io/yaml v1.2.0
@@ -38,6 +38,6 @@ require (
replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
k8s.io/api => k8s.io/api v0.0.0-20200903132205-4352ece35dc9
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200903131814-1a4218b8b919
k8s.io/api => k8s.io/api v0.18.13
k8s.io/apimachinery => k8s.io/apimachinery v0.18.13
)

4
go.sum
View File

@@ -184,8 +184,8 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20200903132205-4352ece35dc9/go.mod h1:pfD5vQS9vnO4IDjaPeU+KovTfbFrRz2VKg7/PvdLUS0=
k8s.io/apimachinery v0.0.0-20200903131814-1a4218b8b919/go.mod h1:PF5taHbXgTEJLU+xMypMmYTXTWPJ5LaW8bfsisxnEXk=
k8s.io/api v0.18.13/go.mod h1:E2EzRe1sNf85qoqgW9luffn9QKixXBNkNw+VKSCzlVg=
k8s.io/apimachinery v0.18.13/go.mod h1:PF5taHbXgTEJLU+xMypMmYTXTWPJ5LaW8bfsisxnEXk=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=

View File

@@ -289,8 +289,12 @@ func (le *LeaderElector) release() bool {
if !le.IsLeader() {
return true
}
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: 1,
RenewTime: now,
AcquireTime: now,
}
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)

View File

@@ -917,3 +917,284 @@ func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) {
func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "configmapsleases")
}
func testReleaseLease(t *testing.T, objectType string) {
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedTime time.Time
reactors []Reactor
expectSuccess bool
transitionLeader bool
outHolder string
}{
{
name: "release acquired lock from no object",
reactors: []Reactor{
{
verb: "get",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
{
verb: "update",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
expectSuccess: true,
outHolder: "",
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
switch objectType {
case "endpoints":
lock = &rl.EndpointsLock{
EndpointsMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
}
case "configmaps":
lock = &rl.ConfigMapLock{
ConfigMapMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
}
case "leases":
lock = &rl.LeaseLock{
LeaseMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoordinationV1(),
}
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
}
observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if !le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
}
le.maybeReportTransition()
// Wait for a response to the leader transition, and add 1 so that we can track the final transition.
wg.Wait()
wg.Add(1)
if test.expectSuccess != le.release() {
t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("wrong number of api interactions")
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("leader should have transitioned but did not")
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("leader should not have transitioned but did")
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
}
})
}
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseEndpoints(t *testing.T) {
testReleaseLease(t, "endpoints")
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseConfigMaps(t *testing.T) {
testReleaseLease(t, "configmaps")
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseLeases(t *testing.T) {
testReleaseLease(t, "leases")
}
func TestReleaseOnCancellation_Endpoints(t *testing.T) {
testReleaseOnCancellation(t, "endpoints")
}
func TestReleaseOnCancellation_ConfigMaps(t *testing.T) {
testReleaseOnCancellation(t, "configmaps")
}
func TestReleaseOnCancellation_Leases(t *testing.T) {
testReleaseOnCancellation(t, "leases")
}
func testReleaseOnCancellation(t *testing.T, objectType string) {
var (
onNewLeader = make(chan struct{})
onRenewCalled = make(chan struct{})
onRenewResume = make(chan struct{})
onRelease = make(chan struct{})
lockObj runtime.Object
updates int
)
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fake.Clientset{}
c.AddReactor("get", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
if lockObj != nil {
return true, lockObj, nil
}
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
})
// create lock
c.AddReactor("create", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockObj = action.(fakeclient.CreateAction).GetObject()
return true, lockObj, nil
})
c.AddReactor("update", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
// Second update (first renew) should return our canceled error
// FakeClient doesn't do anything with the context so we're doing this ourselves
if updates == 2 {
close(onRenewCalled)
<-onRenewResume
return true, nil, context.Canceled
} else if updates == 3 {
close(onRelease)
}
lockObj = action.(fakeclient.UpdateAction).GetObject()
return true, lockObj, nil
})
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
if err != nil {
t.Fatal("resourcelock.New() = ", err)
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 1 * time.Second,
// This is what we're testing
ReleaseOnCancel: true,
Callbacks: LeaderCallbacks{
OnNewLeader: func(identity string) {},
OnStoppedLeading: func() {},
OnStartedLeading: func(context.Context) {
close(onNewLeader)
},
},
}
elector, err := NewLeaderElector(lec)
if err != nil {
t.Fatal("Failed to create leader elector: ", err)
}
ctx, cancel := context.WithCancel(context.Background())
go elector.Run(ctx)
// Wait for us to become the leader
select {
case <-onNewLeader:
case <-time.After(10 * time.Second):
t.Fatal("failed to become the leader")
}
// Wait for renew (update) to be invoked
select {
case <-onRenewCalled:
case <-time.After(10 * time.Second):
t.Fatal("the elector failed to renew the lock")
}
// Cancel the context - stopping the elector while
// it's running
cancel()
// Resume the update call to return the cancellation
// which should trigger the release flow
close(onRenewResume)
select {
case <-onRelease:
case <-time.After(10 * time.Second):
t.Fatal("the lock was not released")
}
}

View File

@@ -92,8 +92,12 @@ func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord)
cml.cm.Annotations = make(map[string]string)
}
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
return err
cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
if err != nil {
return err
}
cml.cm = cm
return nil
}
// RecordEvent in leader election while adding meta-data

View File

@@ -87,8 +87,12 @@ func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) e
el.e.Annotations = make(map[string]string)
}
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
return err
e, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
if err != nil {
return err
}
el.e = e
return nil
}
// RecordEvent in leader election while adding meta-data

View File

@@ -71,9 +71,14 @@ func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error
return errors.New("lease not initialized, call get or create first")
}
ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
return err
lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
if err != nil {
return err
}
ll.lease = lease
return nil
}
// RecordEvent in leader election while adding meta-data

View File

@@ -47,10 +47,8 @@ type tlsCacheKey struct {
keyData string
certFile string
keyFile string
getCert string
serverName string
nextProtos string
dial string
disableCompression bool
}
@@ -59,22 +57,24 @@ func (t tlsCacheKey) String() string {
if len(t.keyData) > 0 {
keyText = "<redacted>"
}
return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, getCert: %s, serverName:%s, dial:%s disableCompression:%t", t.insecure, t.caData, t.certData, keyText, t.getCert, t.serverName, t.dial, t.disableCompression)
return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, serverName:%s, disableCompression:%t", t.insecure, t.caData, t.certData, keyText, t.serverName, t.disableCompression)
}
func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
key, err := tlsConfigKey(config)
key, canCache, err := tlsConfigKey(config)
if err != nil {
return nil, err
}
// Ensure we only create a single transport for the given TLS options
c.mu.Lock()
defer c.mu.Unlock()
if canCache {
// Ensure we only create a single transport for the given TLS options
c.mu.Lock()
defer c.mu.Unlock()
// See if we already have a custom transport for this config
if t, ok := c.transports[key]; ok {
return t, nil
// See if we already have a custom transport for this config
if t, ok := c.transports[key]; ok {
return t, nil
}
}
// Get the TLS options for this client config
@@ -104,8 +104,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
go dynamicCertDialer.Run(wait.NeverStop)
}
// Cache a single transport for these options
c.transports[key] = utilnet.SetTransportDefaults(&http.Transport{
transport := utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
@@ -113,22 +112,32 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
DialContext: dial,
DisableCompression: config.DisableCompression,
})
return c.transports[key], nil
if canCache {
// Cache a single transport for these options
c.transports[key] = transport
}
return transport, nil
}
// tlsConfigKey returns a unique key for tls.Config objects returned from TLSConfigFor
func tlsConfigKey(c *Config) (tlsCacheKey, error) {
func tlsConfigKey(c *Config) (tlsCacheKey, bool, error) {
// Make sure ca/key/cert content is loaded
if err := loadTLSFiles(c); err != nil {
return tlsCacheKey{}, err
return tlsCacheKey{}, false, err
}
if c.TLS.GetCert != nil || c.Dial != nil {
// cannot determine equality for functions
return tlsCacheKey{}, false, nil
}
k := tlsCacheKey{
insecure: c.TLS.Insecure,
caData: string(c.TLS.CAData),
getCert: fmt.Sprintf("%p", c.TLS.GetCert),
serverName: c.TLS.ServerName,
nextProtos: strings.Join(c.TLS.NextProtos, ","),
dial: fmt.Sprintf("%p", c.Dial),
disableCompression: c.DisableCompression,
}
@@ -140,5 +149,5 @@ func tlsConfigKey(c *Config) (tlsCacheKey, error) {
k.keyData = string(c.TLS.KeyData)
}
return k, nil
return k, true, nil
}

View File

@@ -36,16 +36,24 @@ func TestTLSConfigKey(t *testing.T) {
}
for nameA, valueA := range identicalConfigurations {
for nameB, valueB := range identicalConfigurations {
keyA, err := tlsConfigKey(valueA)
keyA, canCache, err := tlsConfigKey(valueA)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameA, err)
continue
}
keyB, err := tlsConfigKey(valueB)
if !canCache {
t.Errorf("Unexpected canCache=false")
continue
}
keyB, canCache, err := tlsConfigKey(valueB)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameB, err)
continue
}
if !canCache {
t.Errorf("Unexpected canCache=false")
continue
}
if keyA != keyB {
t.Errorf("Expected identical cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue
@@ -131,12 +139,12 @@ func TestTLSConfigKey(t *testing.T) {
}
for nameA, valueA := range uniqueConfigurations {
for nameB, valueB := range uniqueConfigurations {
keyA, err := tlsConfigKey(valueA)
keyA, canCacheA, err := tlsConfigKey(valueA)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameA, err)
continue
}
keyB, err := tlsConfigKey(valueB)
keyB, canCacheB, err := tlsConfigKey(valueB)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameB, err)
continue
@@ -147,12 +155,17 @@ func TestTLSConfigKey(t *testing.T) {
if keyA != keyB {
t.Errorf("Expected identical cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
}
if canCacheA != canCacheB {
t.Errorf("Expected identical canCache %q and %q, got:\n\t%v\n\t%v", nameA, nameB, canCacheA, canCacheB)
}
continue
}
if keyA == keyB {
t.Errorf("Expected unique cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue
if canCacheA && canCacheB {
if keyA == keyB {
t.Errorf("Expected unique cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue
}
}
}
}