diff --git a/tools/leaderelection/BUILD b/tools/leaderelection/BUILD new file mode 100644 index 00000000..53ef737d --- /dev/null +++ b/tools/leaderelection/BUILD @@ -0,0 +1,58 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["leaderelection.go"], + tags = ["automanaged"], + deps = [ + "//pkg/apis/componentconfig:go_default_library", + "//pkg/client/leaderelection/resourcelock:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/spf13/pflag:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["leaderelection_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/client/clientset_generated/clientset/typed/core/v1/fake:go_default_library", + "//pkg/client/leaderelection/resourcelock:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/client/leaderelection/resourcelock:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/tools/leaderelection/OWNERS b/tools/leaderelection/OWNERS new file mode 100644 index 00000000..d9516f8a --- /dev/null +++ b/tools/leaderelection/OWNERS @@ -0,0 +1,13 @@ +approvers: +- mikedanese +- timothysc +reviewers: +- wojtek-t +- deads2k +- mikedanese +- gmarek +- eparis +- timothysc +- ingvagabund +- resouer +- goltermann diff --git a/tools/leaderelection/leaderelection.go b/tools/leaderelection/leaderelection.go new file mode 100644 index 00000000..249640b3 --- /dev/null +++ b/tools/leaderelection/leaderelection.go @@ -0,0 +1,274 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package leaderelection implements leader election of a set of endpoints. +// It uses an annotation in the endpoints object to store the record of the +// election state. +// +// This implementation does not guarantee that only one client is acting as a +// leader (a.k.a. fencing). A client observes timestamps captured locally to +// infer the state of the leader election. Thus the implementation is tolerant +// to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate. +// +// However the level of tolerance to skew rate can be configured by setting +// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a +// maximum tolerated ratio of time passed on the fastest node to time passed on +// the slowest node can be approximately achieved with a configuration that sets +// the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted +// to tolerate some nodes progressing forward in time twice as fast as other nodes, +// the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds. +// +// While not required, some method of clock synchronization between nodes in the +// cluster is highly recommended. It's important to keep in mind when configuring +// this client that the tolerance to skew rate varies inversely to master +// availability. +// +// Larger clusters often have a more lenient SLA for API latency. This should be +// taken into account when configuring the client. The rate of leader transitions +// should be monitored and RetryPeriod and LeaseDuration should be increased +// until the rate is stable and acceptably low. It's important to keep in mind +// when configuring this client that the tolerance to API latency varies inversely +// to master availability. +// +// DISCLAIMER: this is an alpha API. This library will likely change significantly +// or even be removed entirely in subsequent releases. Depend on this API at +// your own risk. +package leaderelection + +import ( + "fmt" + "reflect" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + rl "k8s.io/client-go/tools/leaderelection/resourcelock" + + "github.com/golang/glog" +) + +const ( + JitterFactor = 1.2 +) + +// NewLeaderElector creates a LeaderElector from a LeaderElectionConfig +func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { + if lec.LeaseDuration <= lec.RenewDeadline { + return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline") + } + if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { + return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") + } + if lec.Lock == nil { + return nil, fmt.Errorf("Lock must not be nil.") + } + return &LeaderElector{ + config: lec, + }, nil +} + +type LeaderElectionConfig struct { + // Lock is the resource that will be used for locking + Lock rl.Interface + + // LeaseDuration is the duration that non-leader candidates will + // wait to force acquire leadership. This is measured against time of + // last observed ack. + LeaseDuration time.Duration + // RenewDeadline is the duration that the acting master will retry + // refreshing leadership before giving up. + RenewDeadline time.Duration + // RetryPeriod is the duration the LeaderElector clients should wait + // between tries of actions. + RetryPeriod time.Duration + + // Callbacks are callbacks that are triggered during certain lifecycle + // events of the LeaderElector + Callbacks LeaderCallbacks +} + +// LeaderCallbacks are callbacks that are triggered during certain +// lifecycle events of the LeaderElector. These are invoked asynchronously. +// +// possible future callbacks: +// * OnChallenge() +type LeaderCallbacks struct { + // OnStartedLeading is called when a LeaderElector client starts leading + OnStartedLeading func(stop <-chan struct{}) + // OnStoppedLeading is called when a LeaderElector client stops leading + OnStoppedLeading func() + // OnNewLeader is called when the client observes a leader that is + // not the previously observed leader. This includes the first observed + // leader when the client starts. + OnNewLeader func(identity string) +} + +// LeaderElector is a leader election client. +// +// possible future methods: +// * (le *LeaderElector) IsLeader() +// * (le *LeaderElector) GetLeader() +type LeaderElector struct { + config LeaderElectionConfig + // internal bookkeeping + observedRecord rl.LeaderElectionRecord + observedTime time.Time + // used to implement OnNewLeader(), may lag slightly from the + // value observedRecord.HolderIdentity if the transition has + // not yet been reported. + reportedLeader string +} + +// Run starts the leader election loop +func (le *LeaderElector) Run() { + defer func() { + runtime.HandleCrash() + le.config.Callbacks.OnStoppedLeading() + }() + le.acquire() + stop := make(chan struct{}) + go le.config.Callbacks.OnStartedLeading(stop) + le.renew() + close(stop) +} + +// RunOrDie starts a client with the provided config or panics if the config +// fails to validate. +func RunOrDie(lec LeaderElectionConfig) { + le, err := NewLeaderElector(lec) + if err != nil { + panic(err) + } + le.Run() +} + +// GetLeader returns the identity of the last observed leader or returns the empty string if +// no leader has yet been observed. +func (le *LeaderElector) GetLeader() string { + return le.observedRecord.HolderIdentity +} + +// IsLeader returns true if the last observed leader was this client else returns false. +func (le *LeaderElector) IsLeader() bool { + return le.observedRecord.HolderIdentity == le.config.Lock.Identity() +} + +// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds. +func (le *LeaderElector) acquire() { + stop := make(chan struct{}) + glog.Infof("attempting to acquire leader lease...") + wait.JitterUntil(func() { + succeeded := le.tryAcquireOrRenew() + le.maybeReportTransition() + desc := le.config.Lock.Describe() + if !succeeded { + glog.V(4).Infof("failed to acquire lease %v", desc) + return + } + le.config.Lock.RecordEvent("became leader") + glog.Infof("successfully acquired lease %v", desc) + close(stop) + }, le.config.RetryPeriod, JitterFactor, true, stop) +} + +// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails. +func (le *LeaderElector) renew() { + stop := make(chan struct{}) + wait.Until(func() { + err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) { + return le.tryAcquireOrRenew(), nil + }) + le.maybeReportTransition() + desc := le.config.Lock.Describe() + if err == nil { + glog.V(4).Infof("successfully renewed lease %v", desc) + return + } + le.config.Lock.RecordEvent("stopped leading") + glog.Infof("failed to renew lease %v", desc) + close(stop) + }, 0, stop) +} + +// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, +// else it tries to renew the lease if it has already been acquired. Returns true +// on success else returns false. +func (le *LeaderElector) tryAcquireOrRenew() bool { + now := metav1.Now() + leaderElectionRecord := rl.LeaderElectionRecord{ + HolderIdentity: le.config.Lock.Identity(), + LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), + RenewTime: now, + AcquireTime: now, + } + + // 1. obtain or create the ElectionRecord + oldLeaderElectionRecord, err := le.config.Lock.Get() + if err != nil { + if !errors.IsNotFound(err) { + glog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) + return false + } + if err = le.config.Lock.Create(leaderElectionRecord); err != nil { + glog.Errorf("error initially creating leader election record: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = time.Now() + return true + } + + // 2. Record obtained, check the Identity & Time + if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { + le.observedRecord = *oldLeaderElectionRecord + le.observedTime = time.Now() + } + if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && + oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() { + glog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) + return false + } + + // 3. We're going to try to update. The leaderElectionRecord is set to it's default + // here. Let's correct it before updating. + if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() { + leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + } else { + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 + } + + // update the lock itself + if err = le.config.Lock.Update(leaderElectionRecord); err != nil { + glog.Errorf("Failed to update lock: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = time.Now() + return true +} + +func (l *LeaderElector) maybeReportTransition() { + if l.observedRecord.HolderIdentity == l.reportedLeader { + return + } + l.reportedLeader = l.observedRecord.HolderIdentity + if l.config.Callbacks.OnNewLeader != nil { + go l.config.Callbacks.OnNewLeader(l.reportedLeader) + } +} diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go new file mode 100644 index 00000000..5e586463 --- /dev/null +++ b/tools/leaderelection/leaderelection_test.go @@ -0,0 +1,292 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "fmt" + "sync" + "testing" + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" + core "k8s.io/client-go/testing" + rl "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" +) + +func createLockObject(objectType string, objectMeta metav1.ObjectMeta) (obj runtime.Object) { + switch objectType { + case "endpoints": + obj = &v1.Endpoints{ObjectMeta: objectMeta} + case "configmaps": + obj = &v1.ConfigMap{ObjectMeta: objectMeta} + default: + panic("unexpected objType:" + objectType) + } + return +} + +// Will test leader election using endpoints as the resource +func TestTryAcquireOrRenewEndpoints(t *testing.T) { + testTryAcquireOrRenew(t, "endpoints") +} + +func testTryAcquireOrRenew(t *testing.T, objectType string) { + future := time.Now().Add(1000 * time.Hour) + past := time.Now().Add(-1000 * time.Hour) + + tests := []struct { + observedRecord rl.LeaderElectionRecord + observedTime time.Time + reactors []struct { + verb string + reaction core.ReactionFunc + } + + expectSuccess bool + transitionLeader bool + outHolder string + }{ + // acquire from no object + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(core.GetAction).GetResource().GroupResource(), action.(core.GetAction).GetName()) + }, + }, + { + verb: "create", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject(), nil + }, + }, + }, + expectSuccess: true, + outHolder: "baz", + }, + // acquire from unled object + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + objectMeta := metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + } + return true, createLockObject(objectType, objectMeta), nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject(), nil + }, + }, + }, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + // acquire from led, unacked object + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + objectMeta := metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, + }, + } + return true, createLockObject(objectType, objectMeta), nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject(), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"}, + observedTime: past, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + // don't acquire from led, acked object + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + objectMeta := metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, + }, + } + return true, createLockObject(objectType, objectMeta), nil + }, + }, + }, + observedTime: future, + + expectSuccess: false, + outHolder: "bing", + }, + // renew already acquired object + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + objectMeta := metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`, + }, + } + return true, createLockObject(objectType, objectMeta), nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject(), nil + }, + }, + }, + observedTime: future, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"}, + + expectSuccess: true, + outHolder: "baz", + }, + } + + for i, test := range tests { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + var lock rl.Interface + + objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, + } + c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) + } + c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) + return true, nil, fmt.Errorf("unreachable action") + }) + + switch objectType { + case "endpoints": + lock = &rl.EndpointsLock{ + EndpointsMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + case "configmaps": + lock = &rl.ConfigMapLock{ + ConfigMapMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + } + + lec := LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, + }, + } + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedTime: test.observedTime, + } + + if test.expectSuccess != le.tryAcquireOrRenew() { + t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess) + } + + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("[%v]expected holder:\n\t%+v\ngot:\n\t%+v", i, test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("[%v]wrong number of api interactions", i) + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("[%v]leader should have transitioned but did not", i) + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("[%v]leader should not have transitioned but did", i) + } + + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader) + } + } +} + +// Will test leader election using configmap as the resource +func TestTryAcquireOrRenewConfigMaps(t *testing.T) { + testTryAcquireOrRenew(t, "configmaps") +} diff --git a/tools/leaderelection/resourcelock/BUILD b/tools/leaderelection/resourcelock/BUILD new file mode 100644 index 00000000..cd098571 --- /dev/null +++ b/tools/leaderelection/resourcelock/BUILD @@ -0,0 +1,38 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "configmaplock.go", + "endpointslock.go", + "interface.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/tools/leaderelection/resourcelock/configmaplock.go b/tools/leaderelection/resourcelock/configmaplock.go new file mode 100644 index 00000000..69e8c5a7 --- /dev/null +++ b/tools/leaderelection/resourcelock/configmaplock.go @@ -0,0 +1,109 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "encoding/json" + "errors" + "fmt" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" +) + +// TODO: This is almost a exact replica of Endpoints lock. +// going forwards as we self host more and more components +// and use ConfigMaps as the means to pass that configuration +// data we will likely move to deprecate the Endpoints lock. + +type ConfigMapLock struct { + // ConfigMapMeta should contain a Name and a Namespace of an + // ConfigMapMeta object that the Leadercmlector will attempt to lead. + ConfigMapMeta metav1.ObjectMeta + Client corev1client.ConfigMapsGetter + LockConfig ResourceLockConfig + cm *v1.ConfigMap +} + +// Get returns the cmlection record from a ConfigMap Annotation +func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, error) { + var record LeaderElectionRecord + var err error + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if cml.cm.Annotations == nil { + cml.cm.Annotations = make(map[string]string) + } + if recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]; found { + if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { + return nil, err + } + } + return &record, nil +} + +// Create attempts to create a LeadercmlectionRecord annotation +func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error { + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cml.ConfigMapMeta.Name, + Namespace: cml.ConfigMapMeta.Namespace, + Annotations: map[string]string{ + LeaderElectionRecordAnnotationKey: string(recordBytes), + }, + }, + }) + return err +} + +// Update will update and existing annotation on a given resource. +func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error { + if cml.cm == nil { + return errors.New("endpoint not initialized, call get or create first") + } + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(cml.cm) + return err +} + +// RecordEvent in leader cmlection while adding meta-data +func (cml *ConfigMapLock) RecordEvent(s string) { + events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) + cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (cml *ConfigMapLock) Describe() string { + return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) +} + +// returns the Identity of the lock +func (cml *ConfigMapLock) Identity() string { + return cml.LockConfig.Identity +} diff --git a/tools/leaderelection/resourcelock/endpointslock.go b/tools/leaderelection/resourcelock/endpointslock.go new file mode 100644 index 00000000..6f7dcfb0 --- /dev/null +++ b/tools/leaderelection/resourcelock/endpointslock.go @@ -0,0 +1,104 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "encoding/json" + "errors" + "fmt" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" +) + +type EndpointsLock struct { + // EndpointsMeta should contain a Name and a Namespace of an + // Endpoints object that the LeaderElector will attempt to lead. + EndpointsMeta metav1.ObjectMeta + Client corev1client.EndpointsGetter + LockConfig ResourceLockConfig + e *v1.Endpoints +} + +// Get returns the election record from a Endpoints Annotation +func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) { + var record LeaderElectionRecord + var err error + el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if el.e.Annotations == nil { + el.e.Annotations = make(map[string]string) + } + if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found { + if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { + return nil, err + } + } + return &record, nil +} + +// Create attempts to create a LeaderElectionRecord annotation +func (el *EndpointsLock) Create(ler LeaderElectionRecord) error { + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: el.EndpointsMeta.Name, + Namespace: el.EndpointsMeta.Namespace, + Annotations: map[string]string{ + LeaderElectionRecordAnnotationKey: string(recordBytes), + }, + }, + }) + return err +} + +// Update will update and existing annotation on a given resource. +func (el *EndpointsLock) Update(ler LeaderElectionRecord) error { + if el.e == nil { + return errors.New("endpoint not initialized, call get or create first") + } + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) + el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(el.e) + return err +} + +// RecordEvent in leader election while adding meta-data +func (el *EndpointsLock) RecordEvent(s string) { + events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) + el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (el *EndpointsLock) Describe() string { + return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name) +} + +// returns the Identity of the lock +func (el *EndpointsLock) Identity() string { + return el.LockConfig.Identity +} diff --git a/tools/leaderelection/resourcelock/interface.go b/tools/leaderelection/resourcelock/interface.go new file mode 100644 index 00000000..a6ea6917 --- /dev/null +++ b/tools/leaderelection/resourcelock/interface.go @@ -0,0 +1,102 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + cs "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" +) + +const ( + LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader" + EndpointsResourceLock = "endpoints" + ConfigMapsResourceLock = "configmaps" +) + +// LeaderElectionRecord is the record that is stored in the leader election annotation. +// This information should be used for observational purposes only and could be replaced +// with a random string (e.g. UUID) with only slight modification of this code. +// TODO(mikedanese): this should potentially be versioned +type LeaderElectionRecord struct { + HolderIdentity string `json:"holderIdentity"` + LeaseDurationSeconds int `json:"leaseDurationSeconds"` + AcquireTime metav1.Time `json:"acquireTime"` + RenewTime metav1.Time `json:"renewTime"` + LeaderTransitions int `json:"leaderTransitions"` +} + +// ResourceLockConfig common data that exists across different +// resource locks +type ResourceLockConfig struct { + Identity string + EventRecorder record.EventRecorder +} + +// Interface offers a common interface for locking on arbitrary +// resources used in leader election. The Interface is used +// to hide the details on specific implementations in order to allow +// them to change over time. This interface is strictly for use +// by the leaderelection code. +type Interface interface { + // Get returns the LeaderElectionRecord + Get() (*LeaderElectionRecord, error) + + // Create attempts to create a LeaderElectionRecord + Create(ler LeaderElectionRecord) error + + // Update will update and existing LeaderElectionRecord + Update(ler LeaderElectionRecord) error + + // RecordEvent is used to record events + RecordEvent(string) + + // Identity will return the locks Identity + Identity() string + + // Describe is used to convert details on current resource lock + // into a string + Describe() string +} + +// Manufacture will create a lock of a given type according to the input parameters +func New(lockType string, ns string, name string, client *cs.Clientset, rlc ResourceLockConfig) (Interface, error) { + switch lockType { + case EndpointsResourceLock: + return &EndpointsLock{ + EndpointsMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: client, + LockConfig: rlc, + }, nil + case ConfigMapsResourceLock: + return &ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: client, + LockConfig: rlc, + }, nil + default: + return nil, fmt.Errorf("Invalid lock-type %s", lockType) + } +}