CLE controller and client changes

This commit is contained in:
Jefftree 2024-07-21 20:06:03 +00:00
parent b5a62f14cd
commit c47ff1e1a9
17 changed files with 2827 additions and 15 deletions

View File

@ -28,8 +28,9 @@ import (
"sort"
"time"
"github.com/blang/semver/v4"
"github.com/spf13/cobra"
v1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -78,7 +79,9 @@ import (
kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
garbagecollector "k8s.io/kubernetes/pkg/controller/garbagecollector"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/utils/clock"
)
func init() {
@ -289,6 +292,30 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
return startSATokenControllerInit(ctx, controllerContext, controllerName)
}
}
ver, err := semver.ParseTolerant(version.Get().String())
if err != nil {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
// Start component identity lease management
leaseCandidate, err := leaderelection.NewCandidate(
c.Client,
id,
"kube-system",
"kube-controller-manager",
clock.RealClock{},
ver.FinalizeVersion(),
ver.FinalizeVersion(), // TODO: Use compatibility version when it's available
[]v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
)
if err != nil {
return err
}
healthzHandler.AddHealthChecker(healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory))
go leaseCandidate.Run(ctx)
}
// Start the main lock
go leaderElectAndRun(ctx, c, id, electionChecker,
@ -886,6 +913,7 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
Coordinated: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection),
})
panic("unreachable")

View File

@ -24,8 +24,9 @@ import (
"os"
goruntime "runtime"
"github.com/blang/semver/v4"
"github.com/spf13/cobra"
coordinationv1 "k8s.io/api/coordination/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/authentication/authenticator"
@ -56,8 +57,11 @@ import (
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/latest"
@ -207,6 +211,34 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
})
readyzChecks = append(readyzChecks, handlerSyncCheck)
if cc.LeaderElection != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
binaryVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).BinaryVersion().String())
if err != nil {
return err
}
emulationVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).EmulationVersion().String())
if err != nil {
return err
}
// Start component identity lease management
leaseCandidate, err := leaderelection.NewCandidate(
cc.Client,
cc.LeaderElection.Lock.Identity(),
"kube-system",
"kube-scheduler",
clock.RealClock{},
binaryVersion.FinalizeVersion(),
emulationVersion.FinalizeVersion(),
[]coordinationv1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
)
if err != nil {
return err
}
readyzChecks = append(readyzChecks, healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory))
go leaseCandidate.Run(ctx)
}
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks, readyzChecks), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
@ -245,6 +277,9 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
}
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
cc.LeaderElection.Coordinated = true
}
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)

View File

@ -1027,6 +1027,7 @@ EOF
--feature-gates="${FEATURE_GATES}" \
--authentication-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \
--authorization-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \
--leader-elect=false \
--master="https://${API_HOST}:${API_SECURE_PORT}" >"${SCHEDULER_LOG}" 2>&1 &
SCHEDULER_PID=$!
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package apiserver
import (
"context"
"fmt"
"os"
"time"
@ -41,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/controller/leaderelection"
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
"k8s.io/kubernetes/pkg/features"
@ -145,6 +147,27 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.CoordinatedLeaderElection) {
leaseInformer := s.VersionedInformers.Coordination().V1().Leases()
lcInformer := s.VersionedInformers.Coordination().V1alpha1().LeaseCandidates()
// Ensure that informers are registered before starting. Coordinated Leader Election leader-elected
// and may register informer handlers after they are started.
_ = leaseInformer.Informer()
_ = lcInformer.Informer()
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-coordinated-leader-election-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go leaderelection.RunWithLeaderElection(hookContext, s.GenericAPIServer.LoopbackClientConfig, func() (func(ctx context.Context, workers int), error) {
controller, err := leaderelection.NewController(
leaseInformer,
lcInformer,
client.CoordinationV1(),
client.CoordinationV1alpha1(),
)
return controller.Run, err
})
return nil
})
}
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(

View File

@ -0,0 +1,135 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"slices"
"time"
"github.com/blang/semver/v4"
v1 "k8s.io/api/coordination/v1"
v1alpha1 "k8s.io/api/coordination/v1alpha1"
"k8s.io/klog/v2"
)
func pickBestLeaderOldestEmulationVersion(candidates []*v1alpha1.LeaseCandidate) *v1alpha1.LeaseCandidate {
var electee *v1alpha1.LeaseCandidate
for _, c := range candidates {
if !validLeaseCandidateForOldestEmulationVersion(c) {
continue
}
if electee == nil || compare(electee, c) > 0 {
electee = c
}
}
if electee == nil {
klog.Infof("pickBestLeader: none found")
} else {
klog.Infof("pickBestLeader: %s %s", electee.Namespace, electee.Name)
}
return electee
}
func shouldReelect(candidates []*v1alpha1.LeaseCandidate, currentLeader *v1alpha1.LeaseCandidate) bool {
klog.Infof("shouldReelect for candidates: %+v", candidates)
pickedLeader := pickBestLeaderOldestEmulationVersion(candidates)
if pickedLeader == nil {
return false
}
return compare(currentLeader, pickedLeader) > 0
}
func pickBestStrategy(candidates []*v1alpha1.LeaseCandidate) v1.CoordinatedLeaseStrategy {
// TODO: This doesn't account for cycles within the preference graph
// We may have to do a topological sort to verify that the preference ordering is valid
var bestStrategy *v1.CoordinatedLeaseStrategy
for _, c := range candidates {
if len(c.Spec.PreferredStrategies) > 0 {
if bestStrategy == nil {
bestStrategy = &c.Spec.PreferredStrategies[0]
continue
}
if *bestStrategy != c.Spec.PreferredStrategies[0] {
if idx := slices.Index(c.Spec.PreferredStrategies, *bestStrategy); idx > 0 {
bestStrategy = &c.Spec.PreferredStrategies[0]
} else {
klog.Infof("Error: bad strategy ordering")
}
}
}
}
return (*bestStrategy)
}
func validLeaseCandidateForOldestEmulationVersion(l *v1alpha1.LeaseCandidate) bool {
_, err := semver.ParseTolerant(l.Spec.EmulationVersion)
if err != nil {
return false
}
_, err = semver.ParseTolerant(l.Spec.BinaryVersion)
return err == nil
}
func getEmulationVersion(l *v1alpha1.LeaseCandidate) semver.Version {
value := l.Spec.EmulationVersion
v, err := semver.ParseTolerant(value)
if err != nil {
return semver.Version{}
}
return v
}
func getBinaryVersion(l *v1alpha1.LeaseCandidate) semver.Version {
value := l.Spec.BinaryVersion
v, err := semver.ParseTolerant(value)
if err != nil {
return semver.Version{}
}
return v
}
// -1: lhs better, 1: rhs better
func compare(lhs, rhs *v1alpha1.LeaseCandidate) int {
lhsVersion := getEmulationVersion(lhs)
rhsVersion := getEmulationVersion(rhs)
result := lhsVersion.Compare(rhsVersion)
if result == 0 {
lhsVersion := getBinaryVersion(lhs)
rhsVersion := getBinaryVersion(rhs)
result = lhsVersion.Compare(rhsVersion)
}
if result == 0 {
if lhs.CreationTimestamp.After(rhs.CreationTimestamp.Time) {
return 1
}
return -1
}
return result
}
func isLeaseExpired(lease *v1.Lease) bool {
currentTime := time.Now()
return lease.Spec.RenewTime == nil ||
lease.Spec.LeaseDurationSeconds == nil ||
lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second).Before(currentTime)
}
func isLeaseCandidateExpired(lease *v1alpha1.LeaseCandidate) bool {
currentTime := time.Now()
return lease.Spec.RenewTime == nil ||
lease.Spec.RenewTime.Add(leaseCandidateValidDuration).Before(currentTime)
}

View File

@ -0,0 +1,522 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"testing"
"time"
"github.com/blang/semver/v4"
v1 "k8s.io/api/coordination/v1"
v1alpha1 "k8s.io/api/coordination/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestPickBestLeaderOldestEmulationVersion(t *testing.T) {
tests := []struct {
name string
candidates []*v1alpha1.LeaseCandidate
want *v1alpha1.LeaseCandidate
}{
{
name: "empty",
candidates: []*v1alpha1.LeaseCandidate{},
want: nil,
},
{
name: "single candidate",
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate1",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
},
want: &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate1",
Namespace: "default",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
},
{
name: "multiple candidates, different emulation versions",
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate1",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)},
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate2",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.2.0",
BinaryVersion: "0.2.0",
},
},
},
want: &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate1",
Namespace: "default",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "v1",
BinaryVersion: "v1",
},
},
},
{
name: "multiple candidates, same emulation versions, different binary versions",
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate1",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)},
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate2",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.2.0",
},
},
},
want: &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate1",
Namespace: "default",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
},
{
name: "multiple candidates, same emulation versions, same binary versions, different creation timestamps",
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate1",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)},
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate2",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
},
want: &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: "candidate1",
Namespace: "default",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := pickBestLeaderOldestEmulationVersion(tt.candidates)
if got != nil && tt.want != nil {
if got.Name != tt.want.Name || got.Namespace != tt.want.Namespace {
t.Errorf("pickBestLeaderOldestEmulationVersion() = %v, want %v", got, tt.want)
}
} else if got != tt.want {
t.Errorf("pickBestLeaderOldestEmulationVersion() = %v, want %v", got, tt.want)
}
})
}
}
func TestValidLeaseCandidateForOldestEmulationVersion(t *testing.T) {
tests := []struct {
name string
candidate *v1alpha1.LeaseCandidate
want bool
}{
{
name: "valid emulation and binary versions",
candidate: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "0.1.0",
},
},
want: true,
},
{
name: "invalid emulation version",
candidate: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "invalid",
BinaryVersion: "0.1.0",
},
},
want: false,
},
{
name: "invalid binary version",
candidate: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
BinaryVersion: "invalid",
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := validLeaseCandidateForOldestEmulationVersion(tt.candidate)
if got != tt.want {
t.Errorf("validLeaseCandidateForOldestEmulationVersion() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetEmulationVersion(t *testing.T) {
tests := []struct {
name string
candidate *v1alpha1.LeaseCandidate
want semver.Version
}{
{
name: "valid emulation version",
candidate: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "0.1.0",
},
},
want: semver.MustParse("0.1.0"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getEmulationVersion(tt.candidate)
if got.FinalizeVersion() != tt.want.FinalizeVersion() {
t.Errorf("getEmulationVersion() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetBinaryVersion(t *testing.T) {
tests := []struct {
name string
candidate *v1alpha1.LeaseCandidate
want semver.Version
}{
{
name: "valid binary version",
candidate: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
BinaryVersion: "0.3.0",
},
},
want: semver.MustParse("0.3.0"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getBinaryVersion(tt.candidate)
if got.FinalizeVersion() != tt.want.FinalizeVersion() {
t.Errorf("getBinaryVersion() = %v, want %v", got, tt.want)
}
})
}
}
func TestCompare(t *testing.T) {
nowTime := time.Now()
cases := []struct {
name string
lhs *v1alpha1.LeaseCandidate
rhs *v1alpha1.LeaseCandidate
expectedResult int
}{
{
name: "identical versions earlier timestamp",
lhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.20.0",
BinaryVersion: "1.21.0",
},
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.Time{Time: nowTime.Add(time.Duration(1))},
},
},
rhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.20.0",
BinaryVersion: "1.21.0",
},
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.Time{Time: nowTime},
},
},
expectedResult: 1,
},
{
name: "no lhs version",
lhs: &v1alpha1.LeaseCandidate{},
rhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.20.0",
BinaryVersion: "1.21.0",
},
},
expectedResult: -1,
},
{
name: "no rhs version",
lhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.20.0",
BinaryVersion: "1.21.0",
},
},
rhs: &v1alpha1.LeaseCandidate{},
expectedResult: 1,
},
{
name: "invalid lhs version",
lhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "xyz",
BinaryVersion: "xyz",
},
},
rhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.20.0",
BinaryVersion: "1.21.0",
},
},
expectedResult: -1,
},
{
name: "invalid rhs version",
lhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.20.0",
BinaryVersion: "1.21.0",
},
},
rhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "xyz",
BinaryVersion: "xyz",
},
},
expectedResult: 1,
},
{
name: "lhs less than rhs",
lhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.20.0",
},
},
rhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.20.0",
BinaryVersion: "1.20.0",
},
},
expectedResult: -1,
},
{
name: "rhs less than lhs",
lhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.20.0",
BinaryVersion: "1.20.0",
},
},
rhs: &v1alpha1.LeaseCandidate{
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.20.0",
},
},
expectedResult: 1,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result := compare(tc.lhs, tc.rhs)
if result != tc.expectedResult {
t.Errorf("Expected comparison result of %d but got %d", tc.expectedResult, result)
}
})
}
}
func TestShouldReelect(t *testing.T) {
cases := []struct {
name string
candidates []*v1alpha1.LeaseCandidate
currentLeader *v1alpha1.LeaseCandidate
expectResult bool
}{
{
name: "candidate with newer binary version",
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "component-identity-2",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.20.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
currentLeader: &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
expectResult: false,
},
{
name: "no newer candidates",
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "component-identity-2",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
currentLeader: &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
expectResult: false,
},
{
name: "no candidates",
candidates: []*v1alpha1.LeaseCandidate{},
currentLeader: &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
expectResult: false,
},
// TODO: Add test cases where candidates have invalid version numbers
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result := shouldReelect(tc.candidates, tc.currentLeader)
if tc.expectResult != result {
t.Errorf("Expected %t but got %t", tc.expectResult, result)
}
})
}
}

View File

@ -0,0 +1,399 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/coordination/v1"
v1alpha1 "k8s.io/api/coordination/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coordinationv1informers "k8s.io/client-go/informers/coordination/v1"
coordinationv1alpha1 "k8s.io/client-go/informers/coordination/v1alpha1"
coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
coordinationv1alpha1client "k8s.io/client-go/kubernetes/typed/coordination/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
const (
controllerName = "leader-election-controller"
ElectedByAnnotationName = "coordination.k8s.io/elected-by" // Value should be set to controllerName
// Requeue interval is the interval at which a Lease is requeued to verify that it is being renewed properly.
requeueInterval = 5 * time.Second
defaultLeaseDurationSeconds int32 = 5
electionDuration = 5 * time.Second
leaseCandidateValidDuration = 5 * time.Minute
)
// Controller is the leader election controller, which observes component identity leases for
// components that have self-nominated as candidate leaders for leases and elects leaders
// for those leases, favoring candidates with higher versions.
type Controller struct {
leaseInformer coordinationv1informers.LeaseInformer
leaseClient coordinationv1client.CoordinationV1Interface
leaseRegistration cache.ResourceEventHandlerRegistration
leaseCandidateInformer coordinationv1alpha1.LeaseCandidateInformer
leaseCandidateClient coordinationv1alpha1client.CoordinationV1alpha1Interface
leaseCandidateRegistration cache.ResourceEventHandlerRegistration
queue workqueue.TypedRateLimitingInterface[types.NamespacedName]
}
func (c *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
defer func() {
err := c.leaseInformer.Informer().RemoveEventHandler(c.leaseRegistration)
if err != nil {
klog.Warning("error removing leaseInformer eventhandler")
}
err = c.leaseCandidateInformer.Informer().RemoveEventHandler(c.leaseCandidateRegistration)
if err != nil {
klog.Warning("error removing leaseCandidateInformer eventhandler")
}
}()
if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), c.leaseRegistration.HasSynced, c.leaseCandidateRegistration.HasSynced) {
return
}
// This controller is leader elected and may start after informers have already started. List on startup.
lcs, err := c.leaseCandidateInformer.Lister().List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return
}
for _, lc := range lcs {
c.processCandidate(lc)
}
klog.Infof("Workers: %d", workers)
for i := 0; i < workers; i++ {
klog.Infof("Starting worker")
go wait.UntilWithContext(ctx, c.runElectionWorker, time.Second)
}
<-ctx.Done()
}
func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCandidateInformer coordinationv1alpha1.LeaseCandidateInformer, leaseClient coordinationv1client.CoordinationV1Interface, leaseCandidateClient coordinationv1alpha1client.CoordinationV1alpha1Interface) (*Controller, error) {
c := &Controller{
leaseInformer: leaseInformer,
leaseCandidateInformer: leaseCandidateInformer,
leaseClient: leaseClient,
leaseCandidateClient: leaseCandidateClient,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{Name: controllerName}),
}
leaseSynced, err := leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.processLease(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.processLease(newObj)
},
DeleteFunc: func(oldObj interface{}) {
c.processLease(oldObj)
},
})
if err != nil {
return nil, err
}
leaseCandidateSynced, err := leaseCandidateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.processCandidate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.processCandidate(newObj)
},
DeleteFunc: func(oldObj interface{}) {
c.processCandidate(oldObj)
},
})
if err != nil {
return nil, err
}
c.leaseRegistration = leaseSynced
c.leaseCandidateRegistration = leaseCandidateSynced
return c, nil
}
func (c *Controller) runElectionWorker(ctx context.Context) {
for c.processNextElectionItem(ctx) {
}
}
func (c *Controller) processNextElectionItem(ctx context.Context) bool {
key, shutdown := c.queue.Get()
if shutdown {
return false
}
completed, err := c.reconcileElectionStep(ctx, key)
utilruntime.HandleError(err)
if completed {
defer c.queue.AddAfter(key, requeueInterval)
}
c.queue.Done(key)
return true
}
func (c *Controller) processCandidate(obj any) {
lc, ok := obj.(*v1alpha1.LeaseCandidate)
if !ok {
return
}
if lc == nil {
return
}
// Ignore candidates that transitioned to Pending because reelection is already in progress
if lc.Spec.PingTime != nil {
return
}
c.queue.Add(types.NamespacedName{Namespace: lc.Namespace, Name: lc.Spec.LeaseName})
}
func (c *Controller) processLease(obj any) {
lease, ok := obj.(*v1.Lease)
if !ok {
return
}
c.queue.Add(types.NamespacedName{Namespace: lease.Namespace, Name: lease.Name})
}
func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, leaseNN types.NamespacedName) (bool, error) {
lease, err := c.leaseInformer.Lister().Leases(leaseNN.Namespace).Get(leaseNN.Name)
if err != nil && !apierrors.IsNotFound(err) {
return false, fmt.Errorf("error reading lease")
} else if apierrors.IsNotFound(err) {
return true, nil
}
if isLeaseExpired(lease) || lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" {
return true, nil
}
prelimStrategy := pickBestStrategy(candidates)
if prelimStrategy != v1.OldestEmulationVersion {
klog.V(2).Infof("strategy %s is not recognized by CLE.", prelimStrategy)
return false, nil
}
prelimElectee := pickBestLeaderOldestEmulationVersion(candidates)
if prelimElectee == nil {
return false, nil
} else if lease != nil && lease.Spec.HolderIdentity != nil && prelimElectee.Name == *lease.Spec.HolderIdentity {
klog.V(2).Infof("Leader %s is already most optimal for lease %s %s", prelimElectee.Name, lease.Namespace, lease.Name)
return false, nil
}
return true, nil
}
// reconcileElectionStep steps through a step in an election.
// A step looks at the current state of Lease and LeaseCandidates and takes one of the following action
// - do nothing (because leader is already optimal or still waiting for an event)
// - request ack from candidates (update LeaseCandidate PingTime)
// - finds the most optimal candidate and elect (update the Lease object)
// Instead of keeping a map and lock on election, the state is
// calculated every time by looking at the lease, and set of available candidates.
// PingTime + electionDuration > time.Now: We just asked all candidates to ack and are still waiting for response
// PingTime + electionDuration < time.Now: Candidate has not responded within the appropriate PingTime. Continue the election.
// RenewTime + 5 seconds > time.Now: All candidates acked in the last 5 seconds, continue the election.
func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.NamespacedName) (requeue bool, err error) {
now := time.Now()
candidates, err := c.listAdmissableCandidates(leaseNN)
if err != nil {
return true, err
} else if len(candidates) == 0 {
return false, nil
}
klog.V(4).Infof("reconcileElectionStep %q %q, candidates: %d", leaseNN.Namespace, leaseNN.Name, len(candidates))
// Check if an election is really needed by looking at the current lease
// and set of candidates
needElection, err := c.electionNeeded(candidates, leaseNN)
if !needElection || err != nil {
return needElection, err
}
fastTrackElection := false
for _, candidate := range candidates {
// If a candidate has a PingTime within the election duration, they have not acked
// and we should wait until we receive their response
if candidate.Spec.PingTime != nil {
if candidate.Spec.PingTime.Add(electionDuration).After(now) {
// continue waiting for the election to timeout
return false, nil
} else {
// election timed out without ack. Clear and start election.
fastTrackElection = true
clone := candidate.DeepCopy()
clone.Spec.PingTime = nil
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return false, err
}
break
}
}
}
if !fastTrackElection {
continueElection := true
for _, candidate := range candidates {
// if renewTime of a candidate is longer than electionDuration old, we have to ping.
if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).Before(now) {
continueElection = false
break
}
}
if !continueElection {
// Send an "are you alive" signal to all candidates
for _, candidate := range candidates {
clone := candidate.DeepCopy()
clone.Spec.PingTime = &metav1.MicroTime{Time: time.Now()}
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return false, err
}
}
return true, nil
}
}
var ackedCandidates []*v1alpha1.LeaseCandidate
for _, candidate := range candidates {
if candidate.Spec.RenewTime.Add(electionDuration).After(now) {
ackedCandidates = append(ackedCandidates, candidate)
}
}
if len(ackedCandidates) == 0 {
return false, fmt.Errorf("no available candidates")
}
strategy := pickBestStrategy(ackedCandidates)
if strategy != v1.OldestEmulationVersion {
klog.V(2).Infof("strategy %s is not recognized by CLE.", strategy)
return false, nil
}
electee := pickBestLeaderOldestEmulationVersion(ackedCandidates)
if electee == nil {
return false, fmt.Errorf("should not happen, could not find suitable electee")
}
electeeName := electee.Name
// create the leader election lease
leaderLease := &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Namespace: leaseNN.Namespace,
Name: leaseNN.Name,
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1.LeaseSpec{
HolderIdentity: &electeeName,
Strategy: &strategy,
LeaseDurationSeconds: ptr.To(defaultLeaseDurationSeconds),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
_, err = c.leaseClient.Leases(leaseNN.Namespace).Create(ctx, leaderLease, metav1.CreateOptions{})
// If the create was successful, then we can return here.
if err == nil {
klog.Infof("Created lease %q %q for %q", leaseNN.Namespace, leaseNN.Name, electee.Name)
return true, nil
}
// If there was an error, return
if !apierrors.IsAlreadyExists(err) {
return false, err
}
existingLease, err := c.leaseClient.Leases(leaseNN.Namespace).Get(ctx, leaseNN.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
leaseClone := existingLease.DeepCopy()
// Update the Lease if it either does not have a holder or is expired
isExpired := isLeaseExpired(existingLease)
if leaseClone.Spec.HolderIdentity == nil || *leaseClone.Spec.HolderIdentity == "" || (isExpired && *leaseClone.Spec.HolderIdentity != electeeName) {
klog.Infof("lease %q %q is expired, resetting it and setting holder to %q", leaseNN.Namespace, leaseNN.Name, electee.Name)
leaseClone.Spec.Strategy = &strategy
leaseClone.Spec.PreferredHolder = nil
if leaseClone.ObjectMeta.Annotations == nil {
leaseClone.ObjectMeta.Annotations = make(map[string]string)
}
leaseClone.ObjectMeta.Annotations[ElectedByAnnotationName] = controllerName
leaseClone.Spec.HolderIdentity = &electeeName
leaseClone.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
leaseClone.Spec.LeaseDurationSeconds = ptr.To(defaultLeaseDurationSeconds)
leaseClone.Spec.AcquireTime = nil
_, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, leaseClone, metav1.UpdateOptions{})
if err != nil {
return false, err
}
} else if leaseClone.Spec.HolderIdentity != nil && *leaseClone.Spec.HolderIdentity != electeeName {
klog.Infof("lease %q %q already exists for holder %q but should be held by %q, marking preferredHolder", leaseNN.Namespace, leaseNN.Name, *leaseClone.Spec.HolderIdentity, electee.Name)
leaseClone.Spec.PreferredHolder = &electeeName
leaseClone.Spec.Strategy = &strategy
_, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, leaseClone, metav1.UpdateOptions{})
if err != nil {
return false, err
}
}
return true, nil
}
func (c *Controller) listAdmissableCandidates(leaseNN types.NamespacedName) ([]*v1alpha1.LeaseCandidate, error) {
leases, err := c.leaseCandidateInformer.Lister().LeaseCandidates(leaseNN.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
var results []*v1alpha1.LeaseCandidate
for _, l := range leases {
if l.Spec.LeaseName != leaseNN.Name {
continue
}
if !isLeaseCandidateExpired(l) {
results = append(results, l)
} else {
klog.Infof("LeaseCandidate %s is expired", l.Name)
}
}
return results, nil
}

View File

@ -0,0 +1,698 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"context"
"testing"
"time"
v1 "k8s.io/api/coordination/v1"
v1alpha1 "k8s.io/api/coordination/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"
"k8s.io/client-go/tools/cache"
)
func TestReconcileElectionStep(t *testing.T) {
tests := []struct {
name string
leaseNN types.NamespacedName
candidates []*v1alpha1.LeaseCandidate
existingLease *v1.Lease
expectedHolderIdentity *string
expectedPreferredHolder string
expectedRequeue bool
expectedError bool
candidatesPinged bool
}{
{
name: "no candidates, no lease, noop",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{},
existingLease: nil,
expectedHolderIdentity: nil,
expectedRequeue: false,
expectedError: false,
},
{
name: "no candidates, lease exists. noop, not managed by CLE",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{},
existingLease: &v1.Lease{},
expectedHolderIdentity: nil,
expectedRequeue: false,
expectedError: false,
},
{
name: "candidates exist, no existing lease should create lease",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
existingLease: nil,
expectedHolderIdentity: ptr.To("component-identity-1"),
expectedRequeue: true,
expectedError: false,
},
{
name: "candidates exist, lease exists, unoptimal should set preferredHolder",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-2",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.18.0",
BinaryVersion: "1.18.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
existingLease: &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-A",
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1.LeaseSpec{
HolderIdentity: ptr.To("component-identity-1"),
LeaseDurationSeconds: ptr.To(int32(10)),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
},
},
expectedHolderIdentity: ptr.To("component-identity-1"),
expectedPreferredHolder: "component-identity-2",
expectedRequeue: true,
expectedError: false,
},
{
name: "candidates exist, should only elect leader from acked candidates",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-2",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.20.0",
BinaryVersion: "1.20.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
existingLease: nil,
expectedHolderIdentity: ptr.To("component-identity-2"),
expectedRequeue: true,
expectedError: false,
},
{
name: "candidates exist, lease exists, lease expired",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
existingLease: &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-A",
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1.LeaseSpec{
HolderIdentity: ptr.To("component-identity-expired"),
LeaseDurationSeconds: ptr.To(int32(10)),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
},
},
expectedHolderIdentity: ptr.To("component-identity-1"),
expectedRequeue: true,
expectedError: false,
},
{
name: "candidates exist, no acked candidates should return error",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
existingLease: nil,
expectedHolderIdentity: nil,
expectedRequeue: false,
expectedError: true,
},
{
name: "candidates exist, should ping on election",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
existingLease: nil,
expectedHolderIdentity: nil,
expectedRequeue: true,
expectedError: false,
candidatesPinged: true,
},
{
name: "candidates exist, ping within electionDuration should cause no state change",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PingTime: ptr.To(metav1.NewMicroTime(time.Now())),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
existingLease: nil,
expectedHolderIdentity: nil,
expectedRequeue: false,
expectedError: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
_ = informerFactory.Coordination().V1alpha1().LeaseCandidates().Lister()
controller, err := NewController(
informerFactory.Coordination().V1().Leases(),
informerFactory.Coordination().V1alpha1().LeaseCandidates(),
client.CoordinationV1(),
client.CoordinationV1alpha1(),
)
if err != nil {
t.Fatal(err)
}
go informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
// Set up the fake client with the existing lease
if tc.existingLease != nil {
_, err = client.CoordinationV1().Leases(tc.existingLease.Namespace).Create(ctx, tc.existingLease, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
// Set up the fake client with the candidates
for _, candidate := range tc.candidates {
_, err = client.CoordinationV1alpha1().LeaseCandidates(candidate.Namespace).Create(ctx, candidate, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
cache.WaitForCacheSync(ctx.Done(), controller.leaseCandidateInformer.Informer().HasSynced)
requeue, err := controller.reconcileElectionStep(ctx, tc.leaseNN)
if requeue != tc.expectedRequeue {
t.Errorf("reconcileElectionStep() requeue = %v, want %v", requeue, tc.expectedRequeue)
}
if tc.expectedError && err == nil {
t.Errorf("reconcileElectionStep() error = %v, want error", err)
} else if !tc.expectedError && err != nil {
t.Errorf("reconcileElectionStep() error = %v, want nil", err)
}
// Check the lease holder identity
if tc.expectedHolderIdentity != nil {
lease, err := client.CoordinationV1().Leases(tc.leaseNN.Namespace).Get(ctx, tc.leaseNN.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != *tc.expectedHolderIdentity {
t.Errorf("reconcileElectionStep() holderIdentity = %v, want %v", *lease.Spec.HolderIdentity, *tc.expectedHolderIdentity)
}
if tc.expectedPreferredHolder != "" {
if lease.Spec.PreferredHolder == nil || *lease.Spec.PreferredHolder != tc.expectedPreferredHolder {
t.Errorf("reconcileElectionStep() preferredHolder = %v, want %v", lease.Spec.PreferredHolder, tc.expectedPreferredHolder)
}
}
}
// Verify that ping to candidate was issued
if tc.candidatesPinged {
pinged := false
candidatesList, err := client.CoordinationV1alpha1().LeaseCandidates(tc.leaseNN.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
oldCandidateMap := make(map[string]*v1alpha1.LeaseCandidate)
for _, candidate := range tc.candidates {
oldCandidateMap[candidate.Name] = candidate
}
for _, candidate := range candidatesList.Items {
if candidate.Spec.PingTime != nil {
if oldCandidateMap[candidate.Name].Spec.PingTime == nil {
pinged = true
break
}
}
}
if !pinged {
t.Errorf("reconcileElectionStep() expected candidates to be pinged")
}
}
})
}
}
func TestController(t *testing.T) {
cases := []struct {
name string
leaseNN types.NamespacedName
createAfterControllerStart []*v1alpha1.LeaseCandidate
deleteAfterControllerStart []types.NamespacedName
expectedLeaderLeases []*v1.Lease
}{
{
name: "single candidate leader election",
leaseNN: types.NamespacedName{Namespace: "kube-system", Name: "component-A"},
createAfterControllerStart: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
expectedLeaderLeases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-A",
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1.LeaseSpec{
HolderIdentity: ptr.To("component-identity-1"),
},
},
},
},
{
name: "multiple candidate leader election",
leaseNN: types.NamespacedName{Namespace: "kube-system", Name: "component-A"},
createAfterControllerStart: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-identity-2",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.20.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-identity-3",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.20.0",
BinaryVersion: "1.20.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
expectedLeaderLeases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-A",
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1.LeaseSpec{
HolderIdentity: ptr.To("component-identity-1"),
},
},
},
},
{
name: "deletion of lease triggers reelection",
leaseNN: types.NamespacedName{Namespace: "kube-system", Name: "component-A"},
createAfterControllerStart: []*v1alpha1.LeaseCandidate{
{
// Leader lease
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-A",
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1alpha1.LeaseCandidateSpec{},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
deleteAfterControllerStart: []types.NamespacedName{
{Namespace: "kube-system", Name: "component-A"},
},
expectedLeaderLeases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-A",
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1.LeaseSpec{
HolderIdentity: ptr.To("component-identity-1"),
},
},
},
},
{
name: "better candidate triggers reelection",
leaseNN: types.NamespacedName{Namespace: "kube-system", Name: "component-A"},
createAfterControllerStart: []*v1alpha1.LeaseCandidate{
{
// Leader lease
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-A",
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1alpha1.LeaseCandidateSpec{},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.20.0",
BinaryVersion: "1.20.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-identity-2",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
},
expectedLeaderLeases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "component-A",
Annotations: map[string]string{
ElectedByAnnotationName: controllerName,
},
},
Spec: v1.LeaseSpec{
HolderIdentity: ptr.To("component-identity-2"),
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
controller, err := NewController(
informerFactory.Coordination().V1().Leases(),
informerFactory.Coordination().V1alpha1().LeaseCandidates(),
client.CoordinationV1(),
client.CoordinationV1alpha1(),
)
if err != nil {
t.Fatal(err)
}
go informerFactory.Start(ctx.Done())
go controller.Run(ctx, 1)
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
// Mock out the removal of preferredHolder leases.
// When controllers are running, they are expected to do this voluntarily
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for _, expectedLease := range tc.expectedLeaderLeases {
lease, err := client.CoordinationV1().Leases(expectedLease.Namespace).Get(ctx, expectedLease.Name, metav1.GetOptions{})
if err == nil {
if preferredHolder := lease.Spec.PreferredHolder; preferredHolder != nil {
err = client.CoordinationV1().Leases(expectedLease.Namespace).Delete(ctx, expectedLease.Name, metav1.DeleteOptions{})
if err != nil {
runtime.HandleError(err)
}
}
}
}
}
}
}()
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
// Mock out leasecandidate ack.
// When controllers are running, they are expected to watch and ack
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for _, lc := range tc.createAfterControllerStart {
lease, err := client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Get(ctx, lc.Name, metav1.GetOptions{})
if err == nil {
if lease.Spec.PingTime != nil {
c := lease.DeepCopy()
c.Spec.PingTime = nil
c.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
_, err = client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Update(ctx, c, metav1.UpdateOptions{})
if err != nil {
runtime.HandleError(err)
}
}
}
}
}
}
}()
for _, obj := range tc.createAfterControllerStart {
_, err := client.CoordinationV1alpha1().LeaseCandidates(obj.Namespace).Create(ctx, obj, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
for _, obj := range tc.deleteAfterControllerStart {
err := client.CoordinationV1alpha1().LeaseCandidates(obj.Namespace).Delete(ctx, obj.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
}
for _, expectedLease := range tc.expectedLeaderLeases {
var lease *v1.Lease
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 600*time.Second, true, func(ctx context.Context) (done bool, err error) {
lease, err = client.CoordinationV1().Leases(expectedLease.Namespace).Get(ctx, expectedLease.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return true, err
}
if expectedLease.Spec.HolderIdentity == nil || lease.Spec.HolderIdentity == nil {
return expectedLease.Spec.HolderIdentity == nil && lease.Spec.HolderIdentity == nil, nil
}
if expectedLease.Spec.HolderIdentity != nil && lease.Spec.HolderIdentity != nil && *expectedLease.Spec.HolderIdentity != *lease.Spec.HolderIdentity {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
if lease.Spec.HolderIdentity == nil {
t.Fatalf("Expected HolderIdentity of %s but got nil", expectedLease.Name)
}
if *lease.Spec.HolderIdentity != *expectedLease.Spec.HolderIdentity {
t.Errorf("Expected HolderIdentity of %s but got %s", *expectedLease.Spec.HolderIdentity, *lease.Spec.HolderIdentity)
}
}
})
}
}

View File

@ -0,0 +1,91 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"context"
"os"
"time"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
)
type NewRunner func() (func(ctx context.Context, workers int), error)
// RunWithLeaderElection runs the provided runner function with leader election.
// newRunnerFn might be called multiple times, and it should return another
// controller instance's Run method each time.
// RunWithLeaderElection only returns when the context is done, or initial
// leader election fails.
func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner) {
var cancel context.CancelFunc
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
ctx, cancel = context.WithCancel(ctx)
var err error
run, err := newRunnerFn()
if err != nil {
klog.Infof("Error creating runner: %v", err)
return
}
run(ctx, 1)
},
OnStoppedLeading: func() {
cancel()
},
}
hostname, err := os.Hostname()
if err != nil {
klog.Infof("Error parsing hostname: %v", err)
return
}
rl, err := resourcelock.NewFromKubeconfig(
"leases",
"kube-system",
controllerName,
resourcelock.ResourceLockConfig{
Identity: hostname + "_" + string(uuid.NewUUID()),
},
config,
10,
)
if err != nil {
klog.Infof("Error creating resourcelock: %v", err)
return
}
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: callbacks,
Name: controllerName,
})
if err != nil {
klog.Infof("Error creating leader elector: %v", err)
return
}
le.Run(ctx)
}

View File

@ -91,6 +91,7 @@
ignoredSubTrees:
- "./staging/src/k8s.io/client-go/tools/cache/testing"
- "./staging/src/k8s.io/client-go/tools/leaderelection/resourcelock"
- "./staging/src/k8s.io/client-go/tools/leaderelection"
- "./staging/src/k8s.io/client-go/tools/portforward"
- "./staging/src/k8s.io/client-go/tools/record"
- "./staging/src/k8s.io/client-go/tools/events"

View File

@ -159,6 +159,9 @@ type LeaderElectionConfig struct {
// Name is the name of the resource lock for debugging
Name string
// Coordinated will use the Coordinated Leader Election feature
Coordinated bool
}
// LeaderCallbacks are callbacks that are triggered during certain
@ -249,7 +252,11 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew(ctx)
if !le.config.Coordinated {
succeeded = le.tryAcquireOrRenew(ctx)
} else {
succeeded = le.tryCoordinatedRenew(ctx)
}
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
@ -272,7 +279,11 @@ func (le *LeaderElector) renew(ctx context.Context) {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(timeoutCtx), nil
if !le.config.Coordinated {
return le.tryAcquireOrRenew(timeoutCtx), nil
} else {
return le.tryCoordinatedRenew(timeoutCtx), nil
}
}, timeoutCtx.Done())
le.maybeReportTransition()
@ -282,7 +293,6 @@ func (le *LeaderElector) renew(ctx context.Context) {
return
}
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
@ -315,6 +325,81 @@ func (le *LeaderElector) release() bool {
return true
}
// tryCoordinatedRenew checks if it acquired a lease and tries to renew the
// lease if it has already been acquired. Returns true on success else returns
// false.
func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1. obtain the electionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
klog.Infof("lease lock not found: %v", le.config.Lock.Describe())
return false
}
// 2. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time)
if hasExpired {
klog.Infof("lock has expired: %v", le.config.Lock.Describe())
return false
}
if !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe())
return false
}
// 2b. If the lease has been marked as "end of term", don't renew it
if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" {
klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe())
// TODO: Instead of letting lease expire, the holder may deleted it directly
// This will not be compatible with all controllers, so it needs to be opt-in behavior..
// We must ensure all code guarded by this lease has successfully completed
// prior to releasing or there may be two processes
// simultaneously acting on the critical path.
// Usually once this returns false, the process is terminated..
// xref: OnStoppedLeading
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
leaderElectionRecord.Strategy = oldLeaderElectionRecord.Strategy
le.metrics.slowpathExercised(le.config.Name)
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
@ -37,8 +38,6 @@ import (
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
"github.com/stretchr/testify/assert"
)
func createLockObject(t *testing.T, objectType, namespace, name string, record *rl.LeaderElectionRecord) (obj runtime.Object) {
@ -353,6 +352,147 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}
}
func TestTryCoordinatedRenew(t *testing.T) {
objectType := "leases"
clock := clock.RealClock{}
future := clock.Now().Add(1000 * time.Hour)
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedTime time.Time
retryAfter time.Duration
reactors []Reactor
expectedEvents []string
expectSuccess bool
transitionLeader bool
outHolder string
}{
{
name: "don't acquire from led, acked object",
reactors: []Reactor{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
},
observedTime: future,
expectSuccess: false,
outHolder: "bing",
},
{
name: "renew already acquired object",
reactors: []Reactor{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "update",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
observedTime: future,
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
expectSuccess: true,
outHolder: "baz",
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
recorder := record.NewFakeRecorder(100)
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: recorder,
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
lock = &rl.LeaseLock{
LeaseMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoordinationV1(),
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
Coordinated: true,
}
observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock,
metrics: globalMetricsFactory.newLeaderMetrics(),
}
if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) {
if test.retryAfter != 0 {
time.Sleep(test.retryAfter)
if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) {
t.Errorf("unexpected result of tryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess)
}
} else {
t.Errorf("unexpected result of gryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess)
}
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("wrong number of api interactions")
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("leader should have transitioned but did not")
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("leader should not have transitioned but did")
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
}
assertEqualEvents(t, test.expectedEvents, recorder.Events)
})
}
}
// Will test leader election using lease as the resource
func TestTryAcquireOrRenewLeases(t *testing.T) {
testTryAcquireOrRenew(t, "leases")

View File

@ -0,0 +1,196 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"context"
"time"
v1 "k8s.io/api/coordination/v1"
v1alpha1 "k8s.io/api/coordination/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
coordinationv1alpha1client "k8s.io/client-go/kubernetes/typed/coordination/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
const requeueInterval = 5 * time.Minute
type LeaseCandidate struct {
LeaseClient coordinationv1alpha1client.LeaseCandidateInterface
LeaseCandidateInformer cache.SharedIndexInformer
InformerFactory informers.SharedInformerFactory
HasSynced cache.InformerSynced
// At most there will be one item in this Queue (since we only watch one item)
queue workqueue.TypedRateLimitingInterface[int]
name string
namespace string
// controller lease
leaseName string
Clock clock.Clock
binaryVersion, emulationVersion string
preferredStrategies []v1.CoordinatedLeaseStrategy
}
func NewCandidate(clientset kubernetes.Interface,
candidateName string,
candidateNamespace string,
targetLease string,
clock clock.Clock,
binaryVersion, emulationVersion string,
preferredStrategies []v1.CoordinatedLeaseStrategy,
) (*LeaseCandidate, error) {
fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String()
// A separate informer factory is required because this must start before informerFactories
// are started for leader elected components
informerFactory := informers.NewSharedInformerFactoryWithOptions(
clientset, 5*time.Minute,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector
}),
)
leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer()
lc := &LeaseCandidate{
LeaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace),
LeaseCandidateInformer: leaseCandidateInformer,
InformerFactory: informerFactory,
name: candidateName,
namespace: candidateNamespace,
leaseName: targetLease,
Clock: clock,
binaryVersion: binaryVersion,
emulationVersion: emulationVersion,
preferredStrategies: preferredStrategies,
}
lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"})
synced, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok {
if leasecandidate.Spec.PingTime != nil {
lc.enqueueLease()
}
}
},
})
if err != nil {
return nil, err
}
lc.HasSynced = synced.HasSynced
return lc, nil
}
func (c *LeaseCandidate) Run(ctx context.Context) {
defer c.queue.ShutDown()
go c.InformerFactory.Start(ctx.Done())
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.HasSynced) {
return
}
c.enqueueLease()
go c.runWorker(ctx)
<-ctx.Done()
}
func (c *LeaseCandidate) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *LeaseCandidate) processNextWorkItem(ctx context.Context) bool {
key, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(key)
err := c.ensureLease(ctx)
if err == nil {
c.queue.AddAfter(key, requeueInterval)
return true
}
utilruntime.HandleError(err)
klog.Infof("processNextWorkItem.AddRateLimited: %v", key)
c.queue.AddRateLimited(key)
return true
}
func (c *LeaseCandidate) enqueueLease() {
c.queue.Add(0)
}
// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and
// a bool (true if this call created the lease), or any error that occurs.
func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
lease, err := c.LeaseClient.Get(ctx, c.name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
klog.V(2).Infof("Creating lease candidate")
// lease does not exist, create it.
leaseToCreate := c.newLease()
_, err := c.LeaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{})
if err != nil {
return err
}
klog.V(2).Infof("Created lease candidate")
return nil
} else if err != nil {
return err
}
klog.V(2).Infof("lease candidate exists.. renewing")
clone := lease.DeepCopy()
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()}
clone.Spec.PingTime = nil
_, err = c.LeaseClient.Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate {
lease := &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: c.name,
Namespace: c.namespace,
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: c.leaseName,
BinaryVersion: c.binaryVersion,
EmulationVersion: c.emulationVersion,
PreferredStrategies: c.preferredStrategies,
},
}
lease.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()}
return lease
}

View File

@ -0,0 +1,146 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"context"
"testing"
"time"
v1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/clock"
)
type testcase struct {
candidateName, candidateNamespace, leaseName string
binaryVersion, emulationVersion string
}
func TestLeaseCandidateCreation(t *testing.T) {
tc := testcase{
candidateName: "foo",
candidateNamespace: "default",
leaseName: "lease",
binaryVersion: "1.30.0",
emulationVersion: "1.30.0",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
client := fake.NewSimpleClientset()
candidate, err := NewCandidate(
client,
tc.candidateName,
tc.candidateNamespace,
tc.leaseName,
clock.RealClock{},
tc.binaryVersion,
tc.emulationVersion,
[]v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
)
if err != nil {
t.Fatal(err)
}
go candidate.Run(ctx)
err = pollForLease(ctx, tc, client, nil)
if err != nil {
t.Fatal(err)
}
}
func TestLeaseCandidateAck(t *testing.T) {
tc := testcase{
candidateName: "foo",
candidateNamespace: "default",
leaseName: "lease",
binaryVersion: "1.30.0",
emulationVersion: "1.30.0",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
client := fake.NewSimpleClientset()
candidate, err := NewCandidate(
client,
tc.candidateName,
tc.candidateNamespace,
tc.leaseName,
clock.RealClock{},
tc.binaryVersion,
tc.emulationVersion,
[]v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
)
if err != nil {
t.Fatal(err)
}
go candidate.Run(ctx)
err = pollForLease(ctx, tc, client, nil)
if err != nil {
t.Fatal(err)
}
// Update PingTime and verify that the client renews
ensureAfter := &metav1.MicroTime{Time: time.Now()}
lc, err := client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Get(ctx, tc.candidateName, metav1.GetOptions{})
if err == nil {
if lc.Spec.PingTime == nil {
c := lc.DeepCopy()
c.Spec.PingTime = &metav1.MicroTime{Time: time.Now()}
_, err = client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Update(ctx, c, metav1.UpdateOptions{})
if err != nil {
t.Error(err)
}
}
}
err = pollForLease(ctx, tc, client, ensureAfter)
if err != nil {
t.Fatal(err)
}
}
func pollForLease(ctx context.Context, tc testcase, client *fake.Clientset, t *metav1.MicroTime) error {
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
lc, err := client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Get(ctx, tc.candidateName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return true, err
}
if lc.Spec.BinaryVersion == tc.binaryVersion &&
lc.Spec.EmulationVersion == tc.emulationVersion &&
lc.Spec.LeaseName == tc.leaseName &&
lc.Spec.PingTime == nil &&
lc.Spec.RenewTime != nil {
// Ensure that if a time is provided, the renewTime occurred after the provided time.
if t != nil && t.After(lc.Spec.RenewTime.Time) {
return false, nil
}
return true, nil
}
return false, nil
})
}

View File

@ -19,14 +19,15 @@ package resourcelock
import (
"context"
"fmt"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"time"
v1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
)
const (
@ -114,11 +115,13 @@ type LeaderElectionRecord struct {
// attempt to acquire leases with empty identities and will wait for the full lease
// interval to expire before attempting to reacquire. This value is set to empty when
// a client voluntarily steps down.
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime metav1.Time `json:"acquireTime"`
RenewTime metav1.Time `json:"renewTime"`
LeaderTransitions int `json:"leaderTransitions"`
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime metav1.Time `json:"acquireTime"`
RenewTime metav1.Time `json:"renewTime"`
LeaderTransitions int `json:"leaderTransitions"`
Strategy v1.CoordinatedLeaseStrategy `json:"strategy"`
PreferredHolder string `json:"preferredHolder"`
}
// EventRecorder records a change in the ResourceLock.

View File

@ -122,6 +122,12 @@ func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElec
if spec.RenewTime != nil {
r.RenewTime = metav1.Time{Time: spec.RenewTime.Time}
}
if spec.PreferredHolder != nil {
r.PreferredHolder = *spec.PreferredHolder
}
if spec.Strategy != nil {
r.Strategy = *spec.Strategy
}
return &r
}
@ -129,11 +135,18 @@ func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElec
func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec {
leaseDurationSeconds := int32(ler.LeaseDurationSeconds)
leaseTransitions := int32(ler.LeaderTransitions)
return coordinationv1.LeaseSpec{
spec := coordinationv1.LeaseSpec{
HolderIdentity: &ler.HolderIdentity,
LeaseDurationSeconds: &leaseDurationSeconds,
AcquireTime: &metav1.MicroTime{Time: ler.AcquireTime.Time},
RenewTime: &metav1.MicroTime{Time: ler.RenewTime.Time},
LeaseTransitions: &leaseTransitions,
}
if ler.PreferredHolder != "" {
spec.PreferredHolder = &ler.PreferredHolder
}
if ler.Strategy != "" {
spec.Strategy = &ler.Strategy
}
return spec
}

View File

@ -0,0 +1,296 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"context"
"fmt"
"testing"
"time"
v1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubernetes "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestSingleLeaseCandidate(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd())
if err != nil {
t.Fatal(err)
}
defer server.TearDownFn()
config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background())
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0")
cletest.pollForLease("foo", "default", "foo1")
}
func TestMultipleLeaseCandidate(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd())
if err != nil {
t.Fatal(err)
}
defer server.TearDownFn()
config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background())
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0")
go cletest.createAndRunFakeController("foo2", "default", "foo", "1.20.0", "1.19.0")
go cletest.createAndRunFakeController("foo3", "default", "foo", "1.19.0", "1.19.0")
go cletest.createAndRunFakeController("foo4", "default", "foo", "1.2.0", "1.19.0")
go cletest.createAndRunFakeController("foo5", "default", "foo", "1.20.0", "1.19.0")
cletest.pollForLease("foo", "default", "foo3")
}
func TestLeaderDisappear(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd())
if err != nil {
t.Fatal(err)
}
defer server.TearDownFn()
config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background())
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0")
go cletest.createAndRunFakeController("foo2", "default", "foo", "1.20.0", "1.19.0")
cletest.pollForLease("foo", "default", "foo2")
cletest.cancelController("foo2", "default")
cletest.deleteLC("foo2", "default")
cletest.pollForLease("foo", "default", "foo1")
}
func TestLeaseSwapIfBetterAvailable(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd())
if err != nil {
t.Fatal(err)
}
defer server.TearDownFn()
config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background())
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0")
cletest.pollForLease("bar", "default", "bar1")
go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0")
cletest.pollForLease("bar", "default", "bar2")
}
// TestUpgradeSkew tests that a legacy client and a CLE aware client operating on the same lease do not cause errors
func TestUpgradeSkew(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd())
if err != nil {
t.Fatal(err)
}
defer server.TearDownFn()
config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background())
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeLegacyController("foo1-130", "default", "foo")
cletest.pollForLease("foo", "default", "foo1-130")
go cletest.createAndRunFakeController("foo1-131", "default", "foo", "1.31.0", "1.31.0")
// running a new controller should not kick off old leader
cletest.pollForLease("foo", "default", "foo1-130")
cletest.cancelController("foo1-130", "default")
cletest.pollForLease("foo", "default", "foo1-131")
}
type ctxCancelPair struct {
ctx context.Context
cancel func()
}
type cleTest struct {
config *rest.Config
clientset *kubernetes.Clientset
t *testing.T
ctxList map[string]ctxCancelPair
}
func (t cleTest) createAndRunFakeLegacyController(name string, namespace string, targetLease string) {
ctx, cancel := context.WithCancel(context.Background())
t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel}
electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
go leaderElectAndRunUncoordinated(ctx, t.config, name, electionChecker,
namespace,
"leases",
targetLease,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("Elected leader, starting..")
},
OnStoppedLeading: func() {
klog.Errorf("%s Lost leadership, stopping", name)
// klog.FlushAndExit(klog.ExitFlushTimeout, 1)
},
})
}
func (t cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) {
identityLease, err := leaderelection.NewCandidate(
t.clientset,
name,
namespace,
targetLease,
clock.RealClock{},
binaryVersion,
compatibilityVersion,
[]v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
)
if err != nil {
t.t.Error(err)
}
ctx, cancel := context.WithCancel(context.Background())
t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel}
go identityLease.Run(ctx)
electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
go leaderElectAndRunCoordinated(ctx, t.config, name, electionChecker,
namespace,
"leases",
targetLease,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("Elected leader, starting..")
},
OnStoppedLeading: func() {
klog.Errorf("%s Lost leadership, stopping", name)
// klog.FlushAndExit(klog.ExitFlushTimeout, 1)
},
})
}
func leaderElectAndRunUncoordinated(ctx context.Context, kubeconfig *rest.Config, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceNamespace, resourceLock, leaseName string, callbacks leaderelection.LeaderCallbacks) {
leaderElectAndRun(ctx, kubeconfig, lockIdentity, electionChecker, resourceNamespace, resourceLock, leaseName, callbacks, false)
}
func leaderElectAndRunCoordinated(ctx context.Context, kubeconfig *rest.Config, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceNamespace, resourceLock, leaseName string, callbacks leaderelection.LeaderCallbacks) {
leaderElectAndRun(ctx, kubeconfig, lockIdentity, electionChecker, resourceNamespace, resourceLock, leaseName, callbacks, true)
}
func leaderElectAndRun(ctx context.Context, kubeconfig *rest.Config, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceNamespace, resourceLock, leaseName string, callbacks leaderelection.LeaderCallbacks, coordinated bool) {
logger := klog.FromContext(ctx)
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
resourceNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: lockIdentity,
},
kubeconfig,
5*time.Second)
if err != nil {
logger.Error(err, "Error creating lock")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: 5 * time.Second,
RenewDeadline: 3 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
Coordinated: coordinated,
})
}
func (t cleTest) pollForLease(name, namespace, holder string) {
err := wait.PollUntilContextTimeout(t.ctxList["main"].ctx, 1000*time.Millisecond, 15*time.Second, true, func(ctx context.Context) (done bool, err error) {
lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
fmt.Println(err)
return false, nil
}
return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == holder, nil
})
if err != nil {
t.t.Fatalf("timeout awiting for Lease %s %s err: %v", name, namespace, err)
}
}
func (t cleTest) cancelController(name, namespace string) {
t.ctxList[name+"/"+namespace].cancel()
delete(t.ctxList, name+"/"+namespace)
}
func (t cleTest) cleanup() {
err := t.clientset.CoordinationV1().Leases("kube-system").Delete(context.TODO(), "leader-election-controller", metav1.DeleteOptions{})
if err != nil {
t.t.Error(err)
}
for _, c := range t.ctxList {
c.cancel()
}
}
func (t cleTest) deleteLC(name, namespace string) {
err := t.clientset.CoordinationV1alpha1().LeaseCandidates(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
t.t.Error(err)
}
}
func setupCLE(config *rest.Config, ctx context.Context, cancel func(), t *testing.T) cleTest {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
a := ctxCancelPair{ctx, cancel}
return cleTest{
config: config,
clientset: clientset,
ctxList: map[string]ctxCancelPair{"main": a},
t: t,
}
}