mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Expand choices for using election code.
This commit is contained in:
parent
d05dad6c59
commit
4311273294
53
pkg/election/fake.go
Normal file
53
pkg/election/fake.go
Normal file
@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package election
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// Fake allows for testing of anything consuming a MasterElector.
|
||||
type Fake struct {
|
||||
mux *watch.Mux
|
||||
currentMaster Master
|
||||
lock sync.Mutex // Protect access of currentMaster
|
||||
}
|
||||
|
||||
// NewFake makes a new fake MasterElector.
|
||||
func NewFake() *Fake {
|
||||
// 0 means block for clients.
|
||||
return &Fake{mux: watch.NewMux(0)}
|
||||
}
|
||||
|
||||
func (f *Fake) ChangeMaster(newMaster Master) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.mux.Action(watch.Modified, newMaster)
|
||||
f.currentMaster = newMaster
|
||||
}
|
||||
|
||||
func (f *Fake) Elect(path, id string) watch.Interface {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
w := f.mux.Watch()
|
||||
if f.currentMaster != "" {
|
||||
f.mux.Action(watch.Modified, f.currentMaster)
|
||||
}
|
||||
return w
|
||||
}
|
@ -17,7 +17,11 @@ limitations under the License.
|
||||
package election
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// MasterElector is an interface for services that can elect masters.
|
||||
@ -32,3 +36,75 @@ type MasterElector interface {
|
||||
// and removes the caller from the election
|
||||
Elect(path, id string) watch.Interface
|
||||
}
|
||||
|
||||
// Service represents anything that can start and stop on demand.
|
||||
type Service interface {
|
||||
Start()
|
||||
Stop()
|
||||
}
|
||||
|
||||
type notifier struct {
|
||||
lock sync.Mutex
|
||||
cond *sync.Cond
|
||||
|
||||
// desired is updated with every change, current is updated after
|
||||
// Start()/Stop() finishes. 'cond' is used to signal that a change
|
||||
// might be needed. This handles the case where mastership flops
|
||||
// around without calling Start()/Stop() excessively.
|
||||
desired, current Master
|
||||
|
||||
// for comparison, to see if we are master.
|
||||
id Master
|
||||
|
||||
service Service
|
||||
}
|
||||
|
||||
// Notify runs Elect() on m, and calls Start()/Stop() on s when the
|
||||
// elected master starts/stops matching 'id'. Never returns.
|
||||
func Notify(m MasterElector, path, id string, s Service) {
|
||||
n := ¬ifier{id: Master(id), service: s}
|
||||
n.cond = sync.NewCond(&n.lock)
|
||||
go n.serviceLoop()
|
||||
for {
|
||||
w := m.Elect(path, id)
|
||||
for {
|
||||
event, open := <-w.ResultChan()
|
||||
if !open {
|
||||
break
|
||||
}
|
||||
if event.Type != watch.Modified {
|
||||
continue
|
||||
}
|
||||
electedMaster, ok := event.Object.(Master)
|
||||
if !ok {
|
||||
glog.Errorf("Unexpected object from election channel: %v", event.Object)
|
||||
break
|
||||
}
|
||||
func() {
|
||||
n.lock.Lock()
|
||||
defer n.lock.Unlock()
|
||||
n.desired = electedMaster
|
||||
if n.desired != n.current {
|
||||
n.cond.Signal()
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serviceLoop waits for changes, and calls Start()/Stop() as needed.
|
||||
func (n *notifier) serviceLoop() {
|
||||
n.lock.Lock()
|
||||
defer n.lock.Unlock()
|
||||
for {
|
||||
for n.desired == n.current {
|
||||
n.cond.Wait()
|
||||
}
|
||||
if n.current != n.id && n.desired == n.id {
|
||||
n.service.Start()
|
||||
} else if n.current == n.id && n.desired != n.id {
|
||||
n.service.Stop()
|
||||
}
|
||||
n.current = n.desired
|
||||
}
|
||||
}
|
||||
|
82
pkg/election/master_test.go
Normal file
82
pkg/election/master_test.go
Normal file
@ -0,0 +1,82 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package election
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type slowService struct {
|
||||
t *testing.T
|
||||
on bool
|
||||
// We explicitly have no lock to prove that
|
||||
// Start and Stop are not called concurrently.
|
||||
changes chan<- bool
|
||||
}
|
||||
|
||||
func (s *slowService) Start() {
|
||||
if s.on {
|
||||
s.t.Errorf("started already on service")
|
||||
}
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
s.on = true
|
||||
s.changes <- true
|
||||
}
|
||||
|
||||
func (s *slowService) Stop() {
|
||||
if !s.on {
|
||||
s.t.Errorf("stopped already off service")
|
||||
}
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
s.on = false
|
||||
s.changes <- false
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
m := NewFake()
|
||||
changes := make(chan bool, 1500)
|
||||
s := &slowService{t: t, changes: changes}
|
||||
go Notify(m, "", "me", s)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for i := 0; i < 500; i++ {
|
||||
for _, key := range []string{"me", "notme", "alsonotme"} {
|
||||
m.ChangeMaster(Master(key))
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
<-done
|
||||
time.Sleep(8 * time.Millisecond)
|
||||
close(changes)
|
||||
|
||||
changeList := []bool{}
|
||||
for {
|
||||
change, ok := <-changes
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
changeList = append(changeList, change)
|
||||
}
|
||||
|
||||
if len(changeList) > 1000 {
|
||||
t.Errorf("unexpected number of changes: %v", len(changeList))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user