mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 19:23:40 +00:00
Review feedback
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
This commit is contained in:
parent
e0c6987ca8
commit
68226b0501
@ -30,7 +30,7 @@ import (
|
|||||||
|
|
||||||
"github.com/blang/semver/v4"
|
"github.com/blang/semver/v4"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
v1 "k8s.io/api/coordination/v1"
|
coordinationv1 "k8s.io/api/coordination/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
@ -81,7 +81,6 @@ import (
|
|||||||
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
||||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||||
"k8s.io/utils/clock"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -292,27 +291,27 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
|
|||||||
return startSATokenControllerInit(ctx, controllerContext, controllerName)
|
return startSATokenControllerInit(ctx, controllerContext, controllerName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
|
||||||
ver, err := semver.ParseTolerant(version.Get().String())
|
ver, err := semver.ParseTolerant(version.Get().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
|
// Start lease candidate controller for coordinated leader election
|
||||||
// Start component identity lease management
|
leaseCandidate, waitForSync, err := leaderelection.NewCandidate(
|
||||||
leaseCandidate, err := leaderelection.NewCandidate(
|
|
||||||
c.Client,
|
c.Client,
|
||||||
id,
|
id,
|
||||||
"kube-system",
|
"kube-system",
|
||||||
"kube-controller-manager",
|
"kube-controller-manager",
|
||||||
clock.RealClock{},
|
|
||||||
ver.FinalizeVersion(),
|
ver.FinalizeVersion(),
|
||||||
ver.FinalizeVersion(), // TODO: Use compatibility version when it's available
|
ver.FinalizeVersion(), // TODO(Jefftree): Use compatibility version when it's available
|
||||||
[]v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
|
[]coordinationv1.CoordinatedLeaseStrategy{coordinationv1.OldestEmulationVersion},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
healthzHandler.AddHealthChecker(healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory))
|
healthzHandler.AddHealthChecker(healthz.NewInformerSyncHealthz(waitForSync))
|
||||||
|
|
||||||
go leaseCandidate.Run(ctx)
|
go leaseCandidate.Run(ctx)
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/blang/semver/v4"
|
"github.com/blang/semver/v4"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
coordinationv1 "k8s.io/api/coordination/v1"
|
coordinationv1 "k8s.io/api/coordination/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||||
@ -57,8 +58,6 @@ import (
|
|||||||
"k8s.io/component-base/version"
|
"k8s.io/component-base/version"
|
||||||
"k8s.io/component-base/version/verflag"
|
"k8s.io/component-base/version/verflag"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/utils/clock"
|
|
||||||
|
|
||||||
schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
|
schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
|
||||||
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
|
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
|
||||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
@ -221,21 +220,20 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start component identity lease management
|
// Start lease candidate controller for coordinated leader election
|
||||||
leaseCandidate, err := leaderelection.NewCandidate(
|
leaseCandidate, waitForSync, err := leaderelection.NewCandidate(
|
||||||
cc.Client,
|
cc.Client,
|
||||||
cc.LeaderElection.Lock.Identity(),
|
cc.LeaderElection.Lock.Identity(),
|
||||||
"kube-system",
|
metav1.NamespaceSystem,
|
||||||
"kube-scheduler",
|
"kube-scheduler",
|
||||||
clock.RealClock{},
|
|
||||||
binaryVersion.FinalizeVersion(),
|
binaryVersion.FinalizeVersion(),
|
||||||
emulationVersion.FinalizeVersion(),
|
emulationVersion.FinalizeVersion(),
|
||||||
[]coordinationv1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
|
[]coordinationv1.CoordinatedLeaseStrategy{coordinationv1.OldestEmulationVersion},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
readyzChecks = append(readyzChecks, healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory))
|
readyzChecks = append(readyzChecks, healthz.NewInformerSyncHealthz(waitForSync))
|
||||||
go leaseCandidate.Run(ctx)
|
go leaseCandidate.Run(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1027,7 +1027,6 @@ EOF
|
|||||||
--feature-gates="${FEATURE_GATES}" \
|
--feature-gates="${FEATURE_GATES}" \
|
||||||
--authentication-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \
|
--authentication-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \
|
||||||
--authorization-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \
|
--authorization-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \
|
||||||
--leader-elect=false \
|
|
||||||
--master="https://${API_HOST}:${API_SECURE_PORT}" >"${SCHEDULER_LOG}" 2>&1 &
|
--master="https://${API_HOST}:${API_SECURE_PORT}" >"${SCHEDULER_LOG}" 2>&1 &
|
||||||
SCHEDULER_PID=$!
|
SCHEDULER_PID=$!
|
||||||
}
|
}
|
||||||
|
@ -96,7 +96,7 @@ type LeaseList struct {
|
|||||||
|
|
||||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||||
|
|
||||||
// LeaseCandidate defines a candidate for a lease object.
|
// LeaseCandidate defines a candidate for a Lease object.
|
||||||
// Candidates are created such that coordinated leader election will pick the best leader from the list of candidates.
|
// Candidates are created such that coordinated leader election will pick the best leader from the list of candidates.
|
||||||
type LeaseCandidate struct {
|
type LeaseCandidate struct {
|
||||||
metav1.TypeMeta
|
metav1.TypeMeta
|
||||||
@ -109,6 +109,7 @@ type LeaseCandidate struct {
|
|||||||
type LeaseCandidateSpec struct {
|
type LeaseCandidateSpec struct {
|
||||||
// LeaseName is the name of the lease for which this candidate is contending.
|
// LeaseName is the name of the lease for which this candidate is contending.
|
||||||
// This field is immutable.
|
// This field is immutable.
|
||||||
|
// +required
|
||||||
LeaseName string
|
LeaseName string
|
||||||
// PingTime is the last time that the server has requested the LeaseCandidate
|
// PingTime is the last time that the server has requested the LeaseCandidate
|
||||||
// to renew. It is only done during leader election to check if any
|
// to renew. It is only done during leader election to check if any
|
||||||
@ -120,16 +121,18 @@ type LeaseCandidateSpec struct {
|
|||||||
// Any time a Lease needs to do leader election, the PingTime field
|
// Any time a Lease needs to do leader election, the PingTime field
|
||||||
// is updated to signal to the LeaseCandidate that they should update
|
// is updated to signal to the LeaseCandidate that they should update
|
||||||
// the RenewTime.
|
// the RenewTime.
|
||||||
// Old LeaseCandidate objects are also garbage collected if it has been hours since the last renew.
|
// Old LeaseCandidate objects are also garbage collected if it has been hours
|
||||||
|
// since the last renew. The PingTime field is updated regularly to prevent
|
||||||
|
// garbage collection for still active LeaseCandidates.
|
||||||
// +optional
|
// +optional
|
||||||
RenewTime *metav1.MicroTime
|
RenewTime *metav1.MicroTime
|
||||||
// BinaryVersion is the binary version. It must be in a semver format without leadig `v`.
|
// BinaryVersion is the binary version. It must be in a semver format without leading `v`.
|
||||||
// This field is required when Strategy is "OldestEmulationVersion"
|
// This field is required when strategy is "OldestEmulationVersion"
|
||||||
// +optional
|
// +optional
|
||||||
BinaryVersion string
|
BinaryVersion string
|
||||||
// EmulationVersion is the emulation version. It must be in a semver format without leading `v`.
|
// EmulationVersion is the emulation version. It must be in a semver format without leading `v`.
|
||||||
// EmulationVersion must be less than or equal to BinaryVersion.
|
// EmulationVersion must be less than or equal to BinaryVersion.
|
||||||
// This field is required when Strategy is "OldestEmulationVersion"
|
// This field is required when strategy is "OldestEmulationVersion"
|
||||||
// +optional
|
// +optional
|
||||||
EmulationVersion string
|
EmulationVersion string
|
||||||
// PreferredStrategies indicates the list of strategies for picking the leader for coordinated leader election.
|
// PreferredStrategies indicates the list of strategies for picking the leader for coordinated leader election.
|
||||||
@ -137,7 +140,7 @@ type LeaseCandidateSpec struct {
|
|||||||
// leader election to make a decision about the final election strategy. This follows as
|
// leader election to make a decision about the final election strategy. This follows as
|
||||||
// - If all clients have strategy X as the first element in this list, strategy X will be used.
|
// - If all clients have strategy X as the first element in this list, strategy X will be used.
|
||||||
// - If a candidate has strategy [X] and another candidate has strategy [Y, X], Y supersedes X and strategy Y
|
// - If a candidate has strategy [X] and another candidate has strategy [Y, X], Y supersedes X and strategy Y
|
||||||
// will be used
|
// will be used.
|
||||||
// - If a candidate has strategy [X, Y] and another candidate has strategy [Y, X], this is a user error and leader
|
// - If a candidate has strategy [X, Y] and another candidate has strategy [Y, X], this is a user error and leader
|
||||||
// election will not operate the Lease until resolved.
|
// election will not operate the Lease until resolved.
|
||||||
// (Alpha) Using this field requires the CoordinatedLeaderElection feature gate to be enabled.
|
// (Alpha) Using this field requires the CoordinatedLeaderElection feature gate to be enabled.
|
||||||
|
@ -124,11 +124,11 @@ func ValidateLeaseCandidateSpec(spec *coordination.LeaseCandidateSpec, fldPath *
|
|||||||
allErrs = append(allErrs, field.Invalid(fld, spec.BinaryVersion, "must be greater than or equal to `emulationVersion`"))
|
allErrs = append(allErrs, field.Invalid(fld, spec.BinaryVersion, "must be greater than or equal to `emulationVersion`"))
|
||||||
}
|
}
|
||||||
|
|
||||||
strategySeen := make(map[coordination.CoordinatedLeaseStrategy]bool)
|
|
||||||
|
|
||||||
if len(spec.PreferredStrategies) > 0 {
|
if len(spec.PreferredStrategies) > 0 {
|
||||||
for i, strategy := range spec.PreferredStrategies {
|
for i, strategy := range spec.PreferredStrategies {
|
||||||
fld := fldPath.Child("preferredStrategies").Index(i)
|
fld := fldPath.Child("preferredStrategies").Index(i)
|
||||||
|
|
||||||
|
strategySeen := make(map[coordination.CoordinatedLeaseStrategy]bool)
|
||||||
if _, ok := strategySeen[strategy]; ok {
|
if _, ok := strategySeen[strategy]; ok {
|
||||||
allErrs = append(allErrs, field.Duplicate(fld, strategy))
|
allErrs = append(allErrs, field.Duplicate(fld, strategy))
|
||||||
} else {
|
} else {
|
||||||
@ -153,7 +153,7 @@ func ValidateLeaseCandidateSpec(spec *coordination.LeaseCandidateSpec, fldPath *
|
|||||||
return allErrs
|
return allErrs
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValidateLeaseStrategy validates the Strategy field in both the Lease and LeaseCandidate
|
// ValidateCoordinatedLeaseStrategy validates the Strategy field in both the Lease and LeaseCandidate
|
||||||
func ValidateCoordinatedLeaseStrategy(strategy coordination.CoordinatedLeaseStrategy, fldPath *field.Path) field.ErrorList {
|
func ValidateCoordinatedLeaseStrategy(strategy coordination.CoordinatedLeaseStrategy, fldPath *field.Path) field.ErrorList {
|
||||||
allErrs := field.ErrorList{}
|
allErrs := field.ErrorList{}
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||||
// +k8s:prerelease-lifecycle-gen:introduced=1.31
|
// +k8s:prerelease-lifecycle-gen:introduced=1.31
|
||||||
|
|
||||||
// LeaseCandidate defines a candidate for a lease object.
|
// LeaseCandidate defines a candidate for a Lease object.
|
||||||
// Candidates are created such that coordinated leader election will pick the best leader from the list of candidates.
|
// Candidates are created such that coordinated leader election will pick the best leader from the list of candidates.
|
||||||
type LeaseCandidate struct {
|
type LeaseCandidate struct {
|
||||||
metav1.TypeMeta `json:",inline"`
|
metav1.TypeMeta `json:",inline"`
|
||||||
@ -39,7 +39,7 @@ type LeaseCandidate struct {
|
|||||||
Spec LeaseCandidateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
|
Spec LeaseCandidateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// LeaseSpec is a specification of a Lease.
|
// LeaseCandidateSpec is a specification of a Lease.
|
||||||
type LeaseCandidateSpec struct {
|
type LeaseCandidateSpec struct {
|
||||||
// LeaseName is the name of the lease for which this candidate is contending.
|
// LeaseName is the name of the lease for which this candidate is contending.
|
||||||
// This field is immutable.
|
// This field is immutable.
|
||||||
@ -55,16 +55,18 @@ type LeaseCandidateSpec struct {
|
|||||||
// Any time a Lease needs to do leader election, the PingTime field
|
// Any time a Lease needs to do leader election, the PingTime field
|
||||||
// is updated to signal to the LeaseCandidate that they should update
|
// is updated to signal to the LeaseCandidate that they should update
|
||||||
// the RenewTime.
|
// the RenewTime.
|
||||||
// Old LeaseCandidate objects are also garbage collected if it has been hours since the last renew.
|
// Old LeaseCandidate objects are also garbage collected if it has been hours
|
||||||
|
// since the last renew. The PingTime field is updated regularly to prevent
|
||||||
|
// garbage collection for still active LeaseCandidates.
|
||||||
// +optional
|
// +optional
|
||||||
RenewTime *metav1.MicroTime `json:"renewTime,omitempty" protobuf:"bytes,3,opt,name=renewTime"`
|
RenewTime *metav1.MicroTime `json:"renewTime,omitempty" protobuf:"bytes,3,opt,name=renewTime"`
|
||||||
// BinaryVersion is the binary version. It must be in a semver format without leading `v`.
|
// BinaryVersion is the binary version. It must be in a semver format without leading `v`.
|
||||||
// This field is required when Strategy is "OldestEmulationVersion"
|
// This field is required when strategy is "OldestEmulationVersion"
|
||||||
// +optional
|
// +optional
|
||||||
BinaryVersion string `json:"binaryVersion,omitempty" protobuf:"bytes,4,opt,name=binaryVersion"`
|
BinaryVersion string `json:"binaryVersion,omitempty" protobuf:"bytes,4,opt,name=binaryVersion"`
|
||||||
// EmulationVersion is the emulation version. It must be in a semver format without leading `v`.
|
// EmulationVersion is the emulation version. It must be in a semver format without leading `v`.
|
||||||
// EmulationVersion must be less than or equal to BinaryVersion.
|
// EmulationVersion must be less than or equal to BinaryVersion.
|
||||||
// This field is required when Strategy is "OldestEmulationVersion"
|
// This field is required when strategy is "OldestEmulationVersion"
|
||||||
// +optional
|
// +optional
|
||||||
EmulationVersion string `json:"emulationVersion,omitempty" protobuf:"bytes,5,opt,name=emulationVersion"`
|
EmulationVersion string `json:"emulationVersion,omitempty" protobuf:"bytes,5,opt,name=emulationVersion"`
|
||||||
// PreferredStrategies indicates the list of strategies for picking the leader for coordinated leader election.
|
// PreferredStrategies indicates the list of strategies for picking the leader for coordinated leader election.
|
||||||
@ -72,7 +74,7 @@ type LeaseCandidateSpec struct {
|
|||||||
// leader election to make a decision about the final election strategy. This follows as
|
// leader election to make a decision about the final election strategy. This follows as
|
||||||
// - If all clients have strategy X as the first element in this list, strategy X will be used.
|
// - If all clients have strategy X as the first element in this list, strategy X will be used.
|
||||||
// - If a candidate has strategy [X] and another candidate has strategy [Y, X], Y supersedes X and strategy Y
|
// - If a candidate has strategy [X] and another candidate has strategy [Y, X], Y supersedes X and strategy Y
|
||||||
// will be used
|
// will be used.
|
||||||
// - If a candidate has strategy [X, Y] and another candidate has strategy [Y, X], this is a user error and leader
|
// - If a candidate has strategy [X, Y] and another candidate has strategy [Y, X], this is a user error and leader
|
||||||
// election will not operate the Lease until resolved.
|
// election will not operate the Lease until resolved.
|
||||||
// (Alpha) Using this field requires the CoordinatedLeaderElection feature gate to be enabled.
|
// (Alpha) Using this field requires the CoordinatedLeaderElection feature gate to be enabled.
|
||||||
|
@ -18,6 +18,7 @@ package leaderelection
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/coordination/v1"
|
v1 "k8s.io/api/coordination/v1"
|
||||||
@ -37,11 +38,15 @@ import (
|
|||||||
|
|
||||||
const requeueInterval = 5 * time.Minute
|
const requeueInterval = 5 * time.Minute
|
||||||
|
|
||||||
|
type CacheSyncWaiter interface {
|
||||||
|
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
|
||||||
|
}
|
||||||
|
|
||||||
type LeaseCandidate struct {
|
type LeaseCandidate struct {
|
||||||
LeaseClient coordinationv1alpha1client.LeaseCandidateInterface
|
leaseClient coordinationv1alpha1client.LeaseCandidateInterface
|
||||||
LeaseCandidateInformer cache.SharedIndexInformer
|
leaseCandidateInformer cache.SharedIndexInformer
|
||||||
InformerFactory informers.SharedInformerFactory
|
informerFactory informers.SharedInformerFactory
|
||||||
HasSynced cache.InformerSynced
|
hasSynced cache.InformerSynced
|
||||||
|
|
||||||
// At most there will be one item in this Queue (since we only watch one item)
|
// At most there will be one item in this Queue (since we only watch one item)
|
||||||
queue workqueue.TypedRateLimitingInterface[int]
|
queue workqueue.TypedRateLimitingInterface[int]
|
||||||
@ -52,7 +57,7 @@ type LeaseCandidate struct {
|
|||||||
// controller lease
|
// controller lease
|
||||||
leaseName string
|
leaseName string
|
||||||
|
|
||||||
Clock clock.Clock
|
clock clock.Clock
|
||||||
|
|
||||||
binaryVersion, emulationVersion string
|
binaryVersion, emulationVersion string
|
||||||
preferredStrategies []v1.CoordinatedLeaseStrategy
|
preferredStrategies []v1.CoordinatedLeaseStrategy
|
||||||
@ -62,10 +67,9 @@ func NewCandidate(clientset kubernetes.Interface,
|
|||||||
candidateName string,
|
candidateName string,
|
||||||
candidateNamespace string,
|
candidateNamespace string,
|
||||||
targetLease string,
|
targetLease string,
|
||||||
clock clock.Clock,
|
|
||||||
binaryVersion, emulationVersion string,
|
binaryVersion, emulationVersion string,
|
||||||
preferredStrategies []v1.CoordinatedLeaseStrategy,
|
preferredStrategies []v1.CoordinatedLeaseStrategy,
|
||||||
) (*LeaseCandidate, error) {
|
) (*LeaseCandidate, CacheSyncWaiter, error) {
|
||||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String()
|
fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String()
|
||||||
// A separate informer factory is required because this must start before informerFactories
|
// A separate informer factory is required because this must start before informerFactories
|
||||||
// are started for leader elected components
|
// are started for leader elected components
|
||||||
@ -78,20 +82,20 @@ func NewCandidate(clientset kubernetes.Interface,
|
|||||||
leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer()
|
leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer()
|
||||||
|
|
||||||
lc := &LeaseCandidate{
|
lc := &LeaseCandidate{
|
||||||
LeaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace),
|
leaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace),
|
||||||
LeaseCandidateInformer: leaseCandidateInformer,
|
leaseCandidateInformer: leaseCandidateInformer,
|
||||||
InformerFactory: informerFactory,
|
informerFactory: informerFactory,
|
||||||
name: candidateName,
|
name: candidateName,
|
||||||
namespace: candidateNamespace,
|
namespace: candidateNamespace,
|
||||||
leaseName: targetLease,
|
leaseName: targetLease,
|
||||||
Clock: clock,
|
clock: clock.RealClock{},
|
||||||
binaryVersion: binaryVersion,
|
binaryVersion: binaryVersion,
|
||||||
emulationVersion: emulationVersion,
|
emulationVersion: emulationVersion,
|
||||||
preferredStrategies: preferredStrategies,
|
preferredStrategies: preferredStrategies,
|
||||||
}
|
}
|
||||||
lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"})
|
lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"})
|
||||||
|
|
||||||
synced, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok {
|
if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok {
|
||||||
if leasecandidate.Spec.PingTime != nil {
|
if leasecandidate.Spec.PingTime != nil {
|
||||||
@ -101,18 +105,18 @@ func NewCandidate(clientset kubernetes.Interface,
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
lc.HasSynced = synced.HasSynced
|
lc.hasSynced = h.HasSynced
|
||||||
|
|
||||||
return lc, nil
|
return lc, informerFactory, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LeaseCandidate) Run(ctx context.Context) {
|
func (c *LeaseCandidate) Run(ctx context.Context) {
|
||||||
defer c.queue.ShutDown()
|
defer c.queue.ShutDown()
|
||||||
|
|
||||||
go c.InformerFactory.Start(ctx.Done())
|
go c.informerFactory.Start(ctx.Done())
|
||||||
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.HasSynced) {
|
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,12 +157,12 @@ func (c *LeaseCandidate) enqueueLease() {
|
|||||||
// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and
|
// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and
|
||||||
// a bool (true if this call created the lease), or any error that occurs.
|
// a bool (true if this call created the lease), or any error that occurs.
|
||||||
func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
|
func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
|
||||||
lease, err := c.LeaseClient.Get(ctx, c.name, metav1.GetOptions{})
|
lease, err := c.leaseClient.Get(ctx, c.name, metav1.GetOptions{})
|
||||||
if apierrors.IsNotFound(err) {
|
if apierrors.IsNotFound(err) {
|
||||||
klog.V(2).Infof("Creating lease candidate")
|
klog.V(2).Infof("Creating lease candidate")
|
||||||
// lease does not exist, create it.
|
// lease does not exist, create it.
|
||||||
leaseToCreate := c.newLease()
|
leaseToCreate := c.newLease()
|
||||||
_, err := c.LeaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{})
|
_, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -169,9 +173,9 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
klog.V(2).Infof("lease candidate exists.. renewing")
|
klog.V(2).Infof("lease candidate exists.. renewing")
|
||||||
clone := lease.DeepCopy()
|
clone := lease.DeepCopy()
|
||||||
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()}
|
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
|
||||||
clone.Spec.PingTime = nil
|
clone.Spec.PingTime = nil
|
||||||
_, err = c.LeaseClient.Update(ctx, clone, metav1.UpdateOptions{})
|
_, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -191,6 +195,6 @@ func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate {
|
|||||||
PreferredStrategies: c.preferredStrategies,
|
PreferredStrategies: c.preferredStrategies,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
lease.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()}
|
lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
|
||||||
return lease
|
return lease
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,6 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/utils/clock"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type testcase struct {
|
type testcase struct {
|
||||||
@ -47,12 +46,11 @@ func TestLeaseCandidateCreation(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
client := fake.NewSimpleClientset()
|
client := fake.NewSimpleClientset()
|
||||||
candidate, err := NewCandidate(
|
candidate, _, err := NewCandidate(
|
||||||
client,
|
client,
|
||||||
tc.candidateName,
|
tc.candidateName,
|
||||||
tc.candidateNamespace,
|
tc.candidateNamespace,
|
||||||
tc.leaseName,
|
tc.leaseName,
|
||||||
clock.RealClock{},
|
|
||||||
tc.binaryVersion,
|
tc.binaryVersion,
|
||||||
tc.emulationVersion,
|
tc.emulationVersion,
|
||||||
[]v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
|
[]v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
|
||||||
@ -82,12 +80,11 @@ func TestLeaseCandidateAck(t *testing.T) {
|
|||||||
|
|
||||||
client := fake.NewSimpleClientset()
|
client := fake.NewSimpleClientset()
|
||||||
|
|
||||||
candidate, err := NewCandidate(
|
candidate, _, err := NewCandidate(
|
||||||
client,
|
client,
|
||||||
tc.candidateName,
|
tc.candidateName,
|
||||||
tc.candidateNamespace,
|
tc.candidateNamespace,
|
||||||
tc.leaseName,
|
tc.leaseName,
|
||||||
clock.RealClock{},
|
|
||||||
tc.binaryVersion,
|
tc.binaryVersion,
|
||||||
tc.emulationVersion,
|
tc.emulationVersion,
|
||||||
[]v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
|
[]v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
|
||||||
|
@ -35,8 +35,6 @@ import (
|
|||||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/utils/clock"
|
|
||||||
|
|
||||||
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
)
|
)
|
||||||
@ -199,12 +197,11 @@ func (t cleTest) createAndRunFakeLegacyController(name string, namespace string,
|
|||||||
|
|
||||||
}
|
}
|
||||||
func (t cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) {
|
func (t cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) {
|
||||||
identityLease, err := leaderelection.NewCandidate(
|
identityLease, _, err := leaderelection.NewCandidate(
|
||||||
t.clientset,
|
t.clientset,
|
||||||
name,
|
name,
|
||||||
namespace,
|
namespace,
|
||||||
targetLease,
|
targetLease,
|
||||||
clock.RealClock{},
|
|
||||||
binaryVersion,
|
binaryVersion,
|
||||||
compatibilityVersion,
|
compatibilityVersion,
|
||||||
[]v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
|
[]v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
|
||||||
|
Loading…
Reference in New Issue
Block a user