diff --git a/contrib/mesos/pkg/election/master.go b/contrib/mesos/pkg/election/master.go index a5360e0abc1..a7ef790930d 100644 --- a/contrib/mesos/pkg/election/master.go +++ b/contrib/mesos/pkg/election/master.go @@ -17,8 +17,6 @@ limitations under the License. package election import ( - "sync" - "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/pkg/watch" @@ -46,14 +44,7 @@ type Service interface { } type notifier struct { - changed chan struct{} // to notify the service loop about changed state - - // desired is updated with every change, current is updated after - // Start()/Stop() finishes. 'cond' is used to signal that a change - // might be needed. This handles the case where mastership flops - // around without calling Start()/Stop() excessively. - desired, current Master - lock sync.Mutex // to protect the desired variable + masters chan Master // elected masters arrive here, should be buffered to better deal with rapidly flapping masters // for comparison, to see if we are master. id Master @@ -64,8 +55,7 @@ type notifier struct { // Notify runs Elect() on m, and calls Start()/Stop() on s when the // elected master starts/stops matching 'id'. Never returns. func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) { - n := ¬ifier{id: Master(id), service: s} - n.changed = make(chan struct{}) + n := ¬ifier{id: Master(id), service: s, masters: make(chan Master, 1)} finished := runtime.After(func() { runtime.Until(func() { for { @@ -87,14 +77,21 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) break } - n.lock.Lock() - n.desired = electedMaster - n.lock.Unlock() - - // notify serviceLoop, but don't block. If a change - // is queued already it will see the new n.desired. - select { - case n.changed <- struct{}{}: + sendElected: + for { + select { + case <-abort: + return + case n.masters <- electedMaster: + break sendElected + default: // ring full, discard old value and add the new + select { + case <-abort: + return + case <-n.masters: + default: // ring was cleared for us?! + } + } } } } @@ -106,22 +103,19 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) // serviceLoop waits for changes, and calls Start()/Stop() as needed. func (n *notifier) serviceLoop(abort <-chan struct{}) { + var current Master for { select { case <-abort: return - case <-n.changed: - n.lock.Lock() - newDesired := n.desired // copy value to avoid race below - n.lock.Unlock() - - if n.current != n.id && newDesired == n.id { - n.service.Validate(newDesired, n.current) + case desired := <-n.masters: + if current != n.id && desired == n.id { + n.service.Validate(desired, current) n.service.Start() - } else if n.current == n.id && newDesired != n.id { + } else if current == n.id && desired != n.id { n.service.Stop() } - n.current = newDesired + current = desired } } } diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 691736c87a4..d1a9fc5225e 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -43,6 +43,7 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" bindings "github.com/mesos/mesos-go/scheduler" + "github.com/pborman/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" "golang.org/x/net/context" @@ -594,8 +595,14 @@ func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error { validation := ha.ValidationFunc(validateLeadershipTransition) srv := ha.NewCandidate(schedulerProcess, driverFactory, validation) path := meta.ElectionPath(s.frameworkName) - log.Infof("registering for election at %v with id %v", path, eid.GetValue()) - go election.Notify(election.NewEtcdMasterElector(etcdClient), path, eid.GetValue(), srv, nil) + uuid := eid.GetValue() + ":" + uuid.New() // unique for each scheduler instance + log.Infof("registering for election at %v with id %v", path, uuid) + go election.Notify( + election.NewEtcdMasterElector(etcdClient), + path, + uuid, + srv, + nil) } else { log.Infoln("self-electing in non-HA mode") schedulerProcess.Elect(driverFactory) @@ -650,8 +657,28 @@ func (s *SchedulerServer) awaitFailover(schedulerProcess schedulerProcessInterfa func validateLeadershipTransition(desired, current string) { log.Infof("validating leadership transition") + // desired, current are of the format : (see Run()). + // parse them and ensure that executor ID's match, otherwise the cluster can get into + // a bad state after scheduler failover: executor ID is a config hash that must remain + // consistent across failover events. + var ( + i = strings.LastIndex(desired, ":") + j = strings.LastIndex(current, ":") + ) + + if i > -1 { + desired = desired[0:i] + } else { + log.Fatalf("desired id %q is invalid", desired) + } + if j > -1 { + current = current[0:j] + } else if current != "" { + log.Fatalf("current id %q is invalid", current) + } + if desired != current && current != "" { - log.Fatalf("desired executor id != current executor id", desired, current) + log.Fatalf("desired executor id %q != current executor id %q", desired, current) } }