Merge pull request #11298 from mesosphere/fix-10776

Fix deadlocks and race conditions in mesos master election notifier
This commit is contained in:
Eric Tune 2015-07-15 13:55:17 -07:00
commit f5e6161e49
2 changed files with 41 additions and 40 deletions

View File

@ -46,14 +46,14 @@ type Service interface {
} }
type notifier struct { type notifier struct {
lock sync.Mutex changed chan struct{} // to notify the service loop about changed state
cond *sync.Cond
// desired is updated with every change, current is updated after // desired is updated with every change, current is updated after
// Start()/Stop() finishes. 'cond' is used to signal that a change // Start()/Stop() finishes. 'cond' is used to signal that a change
// might be needed. This handles the case where mastership flops // might be needed. This handles the case where mastership flops
// around without calling Start()/Stop() excessively. // around without calling Start()/Stop() excessively.
desired, current Master desired, current Master
lock sync.Mutex // to protect the desired variable
// for comparison, to see if we are master. // for comparison, to see if we are master.
id Master id Master
@ -65,7 +65,7 @@ type notifier struct {
// elected master starts/stops matching 'id'. Never returns. // elected master starts/stops matching 'id'. Never returns.
func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) { func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) {
n := &notifier{id: Master(id), service: s} n := &notifier{id: Master(id), service: s}
n.cond = sync.NewCond(&n.lock) n.changed = make(chan struct{})
finished := runtime.After(func() { finished := runtime.After(func() {
runtime.Until(func() { runtime.Until(func() {
for { for {
@ -86,14 +86,16 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{})
glog.Errorf("Unexpected object from election channel: %v", event.Object) glog.Errorf("Unexpected object from election channel: %v", event.Object)
break break
} }
func() {
n.lock.Lock() n.lock.Lock()
defer n.lock.Unlock() n.desired = electedMaster
n.desired = electedMaster n.lock.Unlock()
if n.desired != n.current {
n.cond.Signal() // notify serviceLoop, but don't block. If a change
} // is queued already it will see the new n.desired.
}() select {
case n.changed <- struct{}{}:
}
} }
} }
} }
@ -104,31 +106,22 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{})
// serviceLoop waits for changes, and calls Start()/Stop() as needed. // serviceLoop waits for changes, and calls Start()/Stop() as needed.
func (n *notifier) serviceLoop(abort <-chan struct{}) { func (n *notifier) serviceLoop(abort <-chan struct{}) {
n.lock.Lock()
defer n.lock.Unlock()
for { for {
select { select {
case <-abort: case <-abort:
return return
default: case <-n.changed:
for n.desired == n.current { n.lock.Lock()
ch := runtime.After(n.cond.Wait) newDesired := n.desired // copy value to avoid race below
select { n.lock.Unlock()
case <-abort:
n.cond.Signal() // ensure that Wait() returns if n.current != n.id && newDesired == n.id {
<-ch n.service.Validate(newDesired, n.current)
return
case <-ch:
// we were notified and have the lock, proceed..
}
}
if n.current != n.id && n.desired == n.id {
n.service.Validate(n.desired, n.current)
n.service.Start() n.service.Start()
} else if n.current == n.id && n.desired != n.id { } else if n.current == n.id && newDesired != n.id {
n.service.Stop() n.service.Stop()
} }
n.current = n.desired n.current = newDesired
} }
} }
} }

View File

@ -69,8 +69,24 @@ func Test(t *testing.T) {
changes := make(chan bool, 1500) changes := make(chan bool, 1500)
done := make(chan struct{}) done := make(chan struct{})
s := &slowService{t: t, changes: changes, done: done} s := &slowService{t: t, changes: changes, done: done}
// change master to "notme" such that the initial m.Elect call inside Notify
// will trigger an obversable event. We will wait for it to make sure the
// Notify loop will see those master changes triggered by the go routine below.
m.ChangeMaster(Master("me"))
temporaryWatch := m.mux.Watch()
ch := temporaryWatch.ResultChan()
notifyDone := runtime.After(func() { Notify(m, "", "me", s, done) }) notifyDone := runtime.After(func() { Notify(m, "", "me", s, done) })
// wait for the event triggered by the initial m.Elect of Notify. Then drain
// the channel to not block anything.
<-ch
temporaryWatch.Stop()
for i := 0; i < len(ch); i += 1 { // go 1.3 and 1.4 compatible loop
<-ch
}
go func() { go func() {
defer close(done) defer close(done)
for i := 0; i < 500; i++ { for i := 0; i < 500; i++ {
@ -83,16 +99,8 @@ func Test(t *testing.T) {
<-notifyDone <-notifyDone
close(changes) close(changes)
changeList := []bool{} changesNum := len(changes)
for { if changesNum > 1000 || changesNum == 0 {
change, ok := <-changes t.Errorf("unexpected number of changes: %v", changesNum)
if !ok {
break
}
changeList = append(changeList, change)
}
if len(changeList) > 1000 {
t.Errorf("unexpected number of changes: %v", len(changeList))
} }
} }