diff --git a/tools/leaderelection/healthzadaptor.go b/tools/leaderelection/healthzadaptor.go new file mode 100644 index 00000000..b9353729 --- /dev/null +++ b/tools/leaderelection/healthzadaptor.go @@ -0,0 +1,69 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "net/http" + "sync" + "time" +) + +// HealthzAdaptor associates the /healthz endpoint with the LeaderElection object. +// It helps deal with the /healthz endpoint being set up prior to the LeaderElection. +// This contains the code needed to act as an adaptor between the leader +// election code the health check code. It allows us to provide health +// status about the leader election. Most specifically about if the leader +// has failed to renew without exiting the process. In that case we should +// report not healthy and rely on the kubelet to take down the process. +type HealthzAdaptor struct { + pointerLock sync.Mutex + le *LeaderElector + timeout time.Duration +} + +// Name returns the name of the health check we are implementing. +func (l *HealthzAdaptor) Name() string { + return "leaderElection" +} + +// Check is called by the healthz endpoint handler. +// It fails (returns an error) if we own the lease but had not been able to renew it. +func (l *HealthzAdaptor) Check(req *http.Request) error { + l.pointerLock.Lock() + defer l.pointerLock.Unlock() + if l.le == nil { + return nil + } + return l.le.Check(l.timeout) +} + +// SetLeaderElection ties a leader election object to a HealthzAdaptor +func (l *HealthzAdaptor) SetLeaderElection(le *LeaderElector) { + l.pointerLock.Lock() + defer l.pointerLock.Unlock() + l.le = le +} + +// NewLeaderHealthzAdaptor creates a basic healthz adaptor to monitor a leader election. +// timeout determines the time beyond the lease expiry to be allowed for timeout. +// checks within the timeout period after the lease expires will still return healthy. +func NewLeaderHealthzAdaptor(timeout time.Duration) *HealthzAdaptor { + result := &HealthzAdaptor{ + timeout: timeout, + } + return result +} diff --git a/tools/leaderelection/healthzadaptor_test.go b/tools/leaderelection/healthzadaptor_test.go new file mode 100644 index 00000000..746d4913 --- /dev/null +++ b/tools/leaderelection/healthzadaptor_test.go @@ -0,0 +1,175 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "fmt" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + rl "k8s.io/client-go/tools/leaderelection/resourcelock" + "net/http" +) + +type fakeLock struct { + identity string +} + +// Get is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, err error) { + return nil, nil +} + +// Create is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Create(ler rl.LeaderElectionRecord) error { + return nil +} + +// Update is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Update(ler rl.LeaderElectionRecord) error { + return nil +} + +// RecordEvent is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) RecordEvent(string) {} + +// Identity is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Identity() string { + return fl.identity +} + +// Describe is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Describe() string { + return "Dummy implementation of lock for testing" +} + +// TestLeaderElectionHealthChecker tests that the healthcheck for leader election handles its edge cases. +func TestLeaderElectionHealthChecker(t *testing.T) { + current := time.Now() + req := &http.Request{} + + tests := []struct { + description string + expected error + adaptorTimeout time.Duration + elector *LeaderElector + }{ + { + description: "call check before leader elector initialized", + expected: nil, + adaptorTimeout: time.Second * 20, + elector: nil, + }, + { + description: "call check when the the lease is far expired", + expected: fmt.Errorf("failed election to renew leadership on lease %s", "foo"), + adaptorTimeout: time.Second * 20, + elector: &LeaderElector{ + config: LeaderElectionConfig{ + Lock: &fakeLock{identity: "healthTest"}, + LeaseDuration: time.Minute, + Name: "foo", + }, + observedRecord: rl.LeaderElectionRecord{ + HolderIdentity: "healthTest", + }, + observedTime: current, + clock: clock.NewFakeClock(current.Add(time.Hour)), + }, + }, + { + description: "call check when the the lease is far expired but held by another server", + expected: nil, + adaptorTimeout: time.Second * 20, + elector: &LeaderElector{ + config: LeaderElectionConfig{ + Lock: &fakeLock{identity: "healthTest"}, + LeaseDuration: time.Minute, + Name: "foo", + }, + observedRecord: rl.LeaderElectionRecord{ + HolderIdentity: "otherServer", + }, + observedTime: current, + clock: clock.NewFakeClock(current.Add(time.Hour)), + }, + }, + { + description: "call check when the the lease is not expired", + expected: nil, + adaptorTimeout: time.Second * 20, + elector: &LeaderElector{ + config: LeaderElectionConfig{ + Lock: &fakeLock{identity: "healthTest"}, + LeaseDuration: time.Minute, + Name: "foo", + }, + observedRecord: rl.LeaderElectionRecord{ + HolderIdentity: "healthTest", + }, + observedTime: current, + clock: clock.NewFakeClock(current), + }, + }, + { + description: "call check when the the lease is expired but inside the timeout", + expected: nil, + adaptorTimeout: time.Second * 20, + elector: &LeaderElector{ + config: LeaderElectionConfig{ + Lock: &fakeLock{identity: "healthTest"}, + LeaseDuration: time.Minute, + Name: "foo", + }, + observedRecord: rl.LeaderElectionRecord{ + HolderIdentity: "healthTest", + }, + observedTime: current, + clock: clock.NewFakeClock(current.Add(time.Minute).Add(time.Second)), + }, + }, + } + + for _, test := range tests { + adaptor := NewLeaderHealthzAdaptor(test.adaptorTimeout) + if adaptor.le != nil { + t.Errorf("[%s] leaderChecker started with a LeaderElector %v", test.description, adaptor.le) + } + if test.elector != nil { + test.elector.config.WatchDog = adaptor + adaptor.SetLeaderElection(test.elector) + if adaptor.le == nil { + t.Errorf("[%s] adaptor failed to set the LeaderElector", test.description) + } + } + err := adaptor.Check(req) + if test.expected == nil { + if err == nil { + continue + } + t.Errorf("[%s] called check, expected no error but received \"%v\"", test.description, err) + } else { + if err == nil { + t.Errorf("[%s] called check and failed to received the expected error \"%v\"", test.description, test.expected) + } + if err.Error() != test.expected.Error() { + t.Errorf("[%s] called check, expected %v, received %v", test.description, test.expected, err) + } + } + } +} diff --git a/tools/leaderelection/leaderelection.go b/tools/leaderelection/leaderelection.go index 1bd6167b..2096a599 100644 --- a/tools/leaderelection/leaderelection.go +++ b/tools/leaderelection/leaderelection.go @@ -56,6 +56,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" rl "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -90,6 +91,7 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { } return &LeaderElector{ config: lec, + clock: clock.RealClock{}, }, nil } @@ -111,6 +113,13 @@ type LeaderElectionConfig struct { // Callbacks are callbacks that are triggered during certain lifecycle // events of the LeaderElector Callbacks LeaderCallbacks + + // WatchDog is the associated health checker + // WatchDog may be null if its not needed/configured. + WatchDog *HealthzAdaptor + + // Name is the name of the resource lock for debugging + Name string } // LeaderCallbacks are callbacks that are triggered during certain @@ -139,6 +148,12 @@ type LeaderElector struct { // value observedRecord.HolderIdentity if the transition has // not yet been reported. reportedLeader string + + // clock is wrapper around time to allow for less flaky testing + clock clock.Clock + + // name is the name of the resource lock for debugging + name string } // Run starts the leader election loop @@ -163,6 +178,9 @@ func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { if err != nil { panic(err) } + if lec.WatchDog != nil { + lec.WatchDog.SetLeaderElection(le) + } le.Run(ctx) } @@ -257,14 +275,14 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { return false } le.observedRecord = leaderElectionRecord - le.observedTime = time.Now() + le.observedTime = le.clock.Now() return true } // 2. Record obtained, check the Identity & Time if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { le.observedRecord = *oldLeaderElectionRecord - le.observedTime = time.Now() + le.observedTime = le.clock.Now() } if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { @@ -287,7 +305,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { return false } le.observedRecord = leaderElectionRecord - le.observedTime = time.Now() + le.observedTime = le.clock.Now() return true } @@ -300,3 +318,19 @@ func (le *LeaderElector) maybeReportTransition() { go le.config.Callbacks.OnNewLeader(le.reportedLeader) } } + +// Check will determine if the current lease is expired by more than timeout. +func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { + if !le.IsLeader() { + // Currently not concerned with the case that we are hot standby + return nil + } + // If we are more than timeout seconds after the lease duration that is past the timeout + // on the lease renew. Time to start reporting ourselves as unhealthy. We should have + // died but conditions like deadlock can prevent this. (See #70819) + if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease { + return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name) + } + + return nil +} diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go index e99cf57a..842aebda 100644 --- a/tools/leaderelection/leaderelection_test.go +++ b/tools/leaderelection/leaderelection_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" core "k8s.io/client-go/testing" rl "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -257,6 +258,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { config: lec, observedRecord: test.observedRecord, observedTime: test.observedTime, + clock: clock.RealClock{}, } if test.expectSuccess != le.tryAcquireOrRenew() {