diff --git a/pkg/election/fake.go b/pkg/election/fake.go new file mode 100644 index 00000000000..27852b62bbf --- /dev/null +++ b/pkg/election/fake.go @@ -0,0 +1,53 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Fake allows for testing of anything consuming a MasterElector. +type Fake struct { + mux *watch.Mux + currentMaster Master + lock sync.Mutex // Protect access of currentMaster +} + +// NewFake makes a new fake MasterElector. +func NewFake() *Fake { + // 0 means block for clients. + return &Fake{mux: watch.NewMux(0)} +} + +func (f *Fake) ChangeMaster(newMaster Master) { + f.lock.Lock() + defer f.lock.Unlock() + f.mux.Action(watch.Modified, newMaster) + f.currentMaster = newMaster +} + +func (f *Fake) Elect(path, id string) watch.Interface { + f.lock.Lock() + defer f.lock.Unlock() + w := f.mux.Watch() + if f.currentMaster != "" { + f.mux.Action(watch.Modified, f.currentMaster) + } + return w +} diff --git a/pkg/election/master.go b/pkg/election/master.go index 14b16da5564..7cb59ffce2b 100644 --- a/pkg/election/master.go +++ b/pkg/election/master.go @@ -17,7 +17,11 @@ limitations under the License. package election import ( + "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/golang/glog" ) // MasterElector is an interface for services that can elect masters. @@ -32,3 +36,75 @@ type MasterElector interface { // and removes the caller from the election Elect(path, id string) watch.Interface } + +// Service represents anything that can start and stop on demand. +type Service interface { + Start() + Stop() +} + +type notifier struct { + lock sync.Mutex + cond *sync.Cond + + // desired is updated with every change, current is updated after + // Start()/Stop() finishes. 'cond' is used to signal that a change + // might be needed. This handles the case where mastership flops + // around without calling Start()/Stop() excessively. + desired, current Master + + // for comparison, to see if we are master. + id Master + + service Service +} + +// Notify runs Elect() on m, and calls Start()/Stop() on s when the +// elected master starts/stops matching 'id'. Never returns. +func Notify(m MasterElector, path, id string, s Service) { + n := ¬ifier{id: Master(id), service: s} + n.cond = sync.NewCond(&n.lock) + go n.serviceLoop() + for { + w := m.Elect(path, id) + for { + event, open := <-w.ResultChan() + if !open { + break + } + if event.Type != watch.Modified { + continue + } + electedMaster, ok := event.Object.(Master) + if !ok { + glog.Errorf("Unexpected object from election channel: %v", event.Object) + break + } + func() { + n.lock.Lock() + defer n.lock.Unlock() + n.desired = electedMaster + if n.desired != n.current { + n.cond.Signal() + } + }() + } + } +} + +// serviceLoop waits for changes, and calls Start()/Stop() as needed. +func (n *notifier) serviceLoop() { + n.lock.Lock() + defer n.lock.Unlock() + for { + for n.desired == n.current { + n.cond.Wait() + } + if n.current != n.id && n.desired == n.id { + n.service.Start() + } else if n.current == n.id && n.desired != n.id { + n.service.Stop() + } + n.current = n.desired + } +} diff --git a/pkg/election/master_test.go b/pkg/election/master_test.go new file mode 100644 index 00000000000..478602b5cec --- /dev/null +++ b/pkg/election/master_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "testing" + "time" +) + +type slowService struct { + t *testing.T + on bool + // We explicitly have no lock to prove that + // Start and Stop are not called concurrently. + changes chan<- bool +} + +func (s *slowService) Start() { + if s.on { + s.t.Errorf("started already on service") + } + time.Sleep(2 * time.Millisecond) + s.on = true + s.changes <- true +} + +func (s *slowService) Stop() { + if !s.on { + s.t.Errorf("stopped already off service") + } + time.Sleep(2 * time.Millisecond) + s.on = false + s.changes <- false +} + +func Test(t *testing.T) { + m := NewFake() + changes := make(chan bool, 1500) + s := &slowService{t: t, changes: changes} + go Notify(m, "", "me", s) + + done := make(chan struct{}) + go func() { + for i := 0; i < 500; i++ { + for _, key := range []string{"me", "notme", "alsonotme"} { + m.ChangeMaster(Master(key)) + } + } + close(done) + }() + + <-done + time.Sleep(8 * time.Millisecond) + close(changes) + + changeList := []bool{} + for { + change, ok := <-changes + if !ok { + break + } + changeList = append(changeList, change) + } + + if len(changeList) > 1000 { + t.Errorf("unexpected number of changes: %v", len(changeList)) + } +}