fix k8sm-787 scheduler failover regression

This commit is contained in:
James DeFelice 2016-02-23 17:43:00 +00:00
parent d0ce85a6d1
commit 250a511a78
2 changed files with 53 additions and 32 deletions

View File

@ -17,8 +17,6 @@ limitations under the License.
package election
import (
"sync"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
@ -46,14 +44,7 @@ type Service interface {
}
type notifier struct {
changed chan struct{} // to notify the service loop about changed state
// desired is updated with every change, current is updated after
// Start()/Stop() finishes. 'cond' is used to signal that a change
// might be needed. This handles the case where mastership flops
// around without calling Start()/Stop() excessively.
desired, current Master
lock sync.Mutex // to protect the desired variable
masters chan Master // elected masters arrive here, should be buffered to better deal with rapidly flapping masters
// for comparison, to see if we are master.
id Master
@ -64,8 +55,7 @@ type notifier struct {
// Notify runs Elect() on m, and calls Start()/Stop() on s when the
// elected master starts/stops matching 'id'. Never returns.
func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) {
n := &notifier{id: Master(id), service: s}
n.changed = make(chan struct{})
n := &notifier{id: Master(id), service: s, masters: make(chan Master, 1)}
finished := runtime.After(func() {
runtime.Until(func() {
for {
@ -87,14 +77,21 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{})
break
}
n.lock.Lock()
n.desired = electedMaster
n.lock.Unlock()
// notify serviceLoop, but don't block. If a change
// is queued already it will see the new n.desired.
select {
case n.changed <- struct{}{}:
sendElected:
for {
select {
case <-abort:
return
case n.masters <- electedMaster:
break sendElected
default: // ring full, discard old value and add the new
select {
case <-abort:
return
case <-n.masters:
default: // ring was cleared for us?!
}
}
}
}
}
@ -106,22 +103,19 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{})
// serviceLoop waits for changes, and calls Start()/Stop() as needed.
func (n *notifier) serviceLoop(abort <-chan struct{}) {
var current Master
for {
select {
case <-abort:
return
case <-n.changed:
n.lock.Lock()
newDesired := n.desired // copy value to avoid race below
n.lock.Unlock()
if n.current != n.id && newDesired == n.id {
n.service.Validate(newDesired, n.current)
case desired := <-n.masters:
if current != n.id && desired == n.id {
n.service.Validate(desired, current)
n.service.Start()
} else if n.current == n.id && newDesired != n.id {
} else if current == n.id && desired != n.id {
n.service.Stop()
}
n.current = newDesired
current = desired
}
}
}

View File

@ -43,6 +43,7 @@ import (
mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil"
bindings "github.com/mesos/mesos-go/scheduler"
"github.com/pborman/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"golang.org/x/net/context"
@ -594,8 +595,14 @@ func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error {
validation := ha.ValidationFunc(validateLeadershipTransition)
srv := ha.NewCandidate(schedulerProcess, driverFactory, validation)
path := meta.ElectionPath(s.frameworkName)
log.Infof("registering for election at %v with id %v", path, eid.GetValue())
go election.Notify(election.NewEtcdMasterElector(etcdClient), path, eid.GetValue(), srv, nil)
uuid := eid.GetValue() + ":" + uuid.New() // unique for each scheduler instance
log.Infof("registering for election at %v with id %v", path, uuid)
go election.Notify(
election.NewEtcdMasterElector(etcdClient),
path,
uuid,
srv,
nil)
} else {
log.Infoln("self-electing in non-HA mode")
schedulerProcess.Elect(driverFactory)
@ -650,8 +657,28 @@ func (s *SchedulerServer) awaitFailover(schedulerProcess schedulerProcessInterfa
func validateLeadershipTransition(desired, current string) {
log.Infof("validating leadership transition")
// desired, current are of the format <executor-id>:<scheduler-uuid> (see Run()).
// parse them and ensure that executor ID's match, otherwise the cluster can get into
// a bad state after scheduler failover: executor ID is a config hash that must remain
// consistent across failover events.
var (
i = strings.LastIndex(desired, ":")
j = strings.LastIndex(current, ":")
)
if i > -1 {
desired = desired[0:i]
} else {
log.Fatalf("desired id %q is invalid", desired)
}
if j > -1 {
current = current[0:j]
} else if current != "" {
log.Fatalf("current id %q is invalid", current)
}
if desired != current && current != "" {
log.Fatalf("desired executor id != current executor id", desired, current)
log.Fatalf("desired executor id %q != current executor id %q", desired, current)
}
}