Merge pull request #21198 from mesosphere/jdef_update_mgo_latest

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-16 06:37:42 -08:00
commit dd6cc9239d
12 changed files with 640 additions and 399 deletions

32
Godeps/Godeps.json generated
View File

@ -757,43 +757,43 @@
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/auth", "ImportPath": "github.com/mesos/mesos-go/auth",
"Comment": "before-0.26-protos-14-g4a7554a", "Comment": "before-0.26-protos-29-gb755e34",
"Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/detector", "ImportPath": "github.com/mesos/mesos-go/detector",
"Comment": "before-0.26-protos-14-g4a7554a", "Comment": "before-0.26-protos-29-gb755e34",
"Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/executor", "ImportPath": "github.com/mesos/mesos-go/executor",
"Comment": "before-0.26-protos-14-g4a7554a", "Comment": "before-0.26-protos-29-gb755e34",
"Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/mesosproto", "ImportPath": "github.com/mesos/mesos-go/mesosproto",
"Comment": "before-0.26-protos-14-g4a7554a", "Comment": "before-0.26-protos-29-gb755e34",
"Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/mesosutil", "ImportPath": "github.com/mesos/mesos-go/mesosutil",
"Comment": "before-0.26-protos-14-g4a7554a", "Comment": "before-0.26-protos-29-gb755e34",
"Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/messenger", "ImportPath": "github.com/mesos/mesos-go/messenger",
"Comment": "before-0.26-protos-14-g4a7554a", "Comment": "before-0.26-protos-29-gb755e34",
"Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/scheduler", "ImportPath": "github.com/mesos/mesos-go/scheduler",
"Comment": "before-0.26-protos-14-g4a7554a", "Comment": "before-0.26-protos-29-gb755e34",
"Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/upid", "ImportPath": "github.com/mesos/mesos-go/upid",
"Comment": "before-0.26-protos-14-g4a7554a", "Comment": "before-0.26-protos-29-gb755e34",
"Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f"
}, },
{ {
"ImportPath": "github.com/miekg/dns", "ImportPath": "github.com/miekg/dns",

View File

@ -3,6 +3,7 @@ package auth
import ( import (
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/mesos/mesos-go/auth/callback" "github.com/mesos/mesos-go/auth/callback"
"github.com/mesos/mesos-go/upid" "github.com/mesos/mesos-go/upid"
@ -38,8 +39,14 @@ func Login(ctx context.Context, handler callback.Handler) error {
type loginKeyType int type loginKeyType int
const ( const (
loginProviderNameKey loginKeyType = iota // name of login provider to use // name of login provider to use
parentUpidKey // upid.UPID of some parent process loginProviderNameKey loginKeyType = iota
// upid.UPID of some parent process
parentUpidKey
// time.Duration that limits the overall duration of an auth attempt
timeoutKey
) )
// Return a context that inherits all values from the parent ctx and specifies // Return a context that inherits all values from the parent ctx and specifies
@ -74,7 +81,20 @@ func ParentUPIDFrom(ctx context.Context) (pid upid.UPID, ok bool) {
func ParentUPID(ctx context.Context) (upid *upid.UPID) { func ParentUPID(ctx context.Context) (upid *upid.UPID) {
if upid, ok := ParentUPIDFrom(ctx); ok { if upid, ok := ParentUPIDFrom(ctx); ok {
return &upid return &upid
} else {
return nil
} }
return nil
}
func TimeoutFrom(ctx context.Context) (d time.Duration, ok bool) {
d, ok = ctx.Value(timeoutKey).(time.Duration)
return
}
func Timeout(ctx context.Context) (d time.Duration) {
d, _ = TimeoutFrom(ctx)
return
}
func WithTimeout(ctx context.Context, d time.Duration) context.Context {
return context.WithValue(ctx, timeoutKey, d)
} }

View File

@ -31,11 +31,16 @@ import (
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
"github.com/mesos/mesos-go/mesosutil/process" "github.com/mesos/mesos-go/mesosutil/process"
"github.com/mesos/mesos-go/messenger" "github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/messenger/sessionid"
"github.com/mesos/mesos-go/upid" "github.com/mesos/mesos-go/upid"
"github.com/pborman/uuid" "github.com/pborman/uuid"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const (
defaultRecoveryTimeout = 15 * time.Minute
)
type DriverConfig struct { type DriverConfig struct {
Executor Executor Executor Executor
HostnameOverride string // optional HostnameOverride string // optional
@ -64,6 +69,7 @@ type MesosExecutorDriver struct {
directory string // TODO(yifan): Not used yet. directory string // TODO(yifan): Not used yet.
checkpoint bool checkpoint bool
recoveryTimeout time.Duration recoveryTimeout time.Duration
recoveryTimer *time.Timer
updates map[string]*mesosproto.StatusUpdate // Key is a UUID string. TODO(yifan): Not used yet. updates map[string]*mesosproto.StatusUpdate // Key is a UUID string. TODO(yifan): Not used yet.
tasks map[string]*mesosproto.TaskInfo // Key is a UUID string. TODO(yifan): Not used yet. tasks map[string]*mesosproto.TaskInfo // Key is a UUID string. TODO(yifan): Not used yet.
withExecutor func(f func(e Executor)) withExecutor func(f func(e Executor))
@ -88,12 +94,13 @@ func NewMesosExecutorDriver(config DriverConfig) (*MesosExecutorDriver, error) {
} }
driver := &MesosExecutorDriver{ driver := &MesosExecutorDriver{
status: mesosproto.Status_DRIVER_NOT_STARTED, status: mesosproto.Status_DRIVER_NOT_STARTED,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
updates: make(map[string]*mesosproto.StatusUpdate), updates: make(map[string]*mesosproto.StatusUpdate),
tasks: make(map[string]*mesosproto.TaskInfo), tasks: make(map[string]*mesosproto.TaskInfo),
workDir: ".", workDir: ".",
started: make(chan struct{}), started: make(chan struct{}),
recoveryTimeout: defaultRecoveryTimeout,
} }
driver.cond = sync.NewCond(&driver.lock) driver.cond = sync.NewCond(&driver.lock)
// decouple serialized executor callback execution from goroutines of this driver // decouple serialized executor callback execution from goroutines of this driver
@ -116,6 +123,11 @@ func NewMesosExecutorDriver(config DriverConfig) (*MesosExecutorDriver, error) {
return driver, nil return driver, nil
} }
// context returns the driver context, expects driver.lock to be locked
func (driver *MesosExecutorDriver) context() context.Context {
return sessionid.NewContext(context.TODO(), driver.connection.String())
}
// init initializes the driver. // init initializes the driver.
func (driver *MesosExecutorDriver) init() error { func (driver *MesosExecutorDriver) init() error {
log.Infof("Init mesos executor driver\n") log.Infof("Init mesos executor driver\n")
@ -127,11 +139,13 @@ func (driver *MesosExecutorDriver) init() error {
return err return err
} }
guard := func(h messenger.MessageHandler) messenger.MessageHandler { type messageHandler func(context.Context, *upid.UPID, proto.Message)
guard := func(h messageHandler) messenger.MessageHandler {
return messenger.MessageHandler(func(from *upid.UPID, pbMsg proto.Message) { return messenger.MessageHandler(func(from *upid.UPID, pbMsg proto.Message) {
driver.lock.Lock() driver.lock.Lock()
defer driver.lock.Unlock() defer driver.lock.Unlock()
h(from, pbMsg) h(driver.context(), from, pbMsg)
}) })
} }
@ -145,6 +159,7 @@ func (driver *MesosExecutorDriver) init() error {
driver.messenger.Install(guard(driver.frameworkMessage), &mesosproto.FrameworkToExecutorMessage{}) driver.messenger.Install(guard(driver.frameworkMessage), &mesosproto.FrameworkToExecutorMessage{})
driver.messenger.Install(guard(driver.shutdown), &mesosproto.ShutdownExecutorMessage{}) driver.messenger.Install(guard(driver.shutdown), &mesosproto.ShutdownExecutorMessage{})
driver.messenger.Install(guard(driver.frameworkError), &mesosproto.FrameworkErrorMessage{}) driver.messenger.Install(guard(driver.frameworkError), &mesosproto.FrameworkErrorMessage{})
driver.messenger.Install(guard(driver.networkError), &mesosproto.InternalNetworkError{})
return nil return nil
} }
@ -185,7 +200,13 @@ func (driver *MesosExecutorDriver) parseEnviroments() error {
if value == "1" { if value == "1" {
driver.checkpoint = true driver.checkpoint = true
} }
// TODO(yifan): Parse the duration. For now just use default.
/*
if driver.checkpoint {
value = os.Getenv("MESOS_RECOVERY_TIMEOUT")
}
// TODO(yifan): Parse the duration. For now just use default.
*/
return nil return nil
} }
@ -214,7 +235,73 @@ func (driver *MesosExecutorDriver) Connected() bool {
// --------------------- Message Handlers --------------------- // // --------------------- Message Handlers --------------------- //
func (driver *MesosExecutorDriver) registered(from *upid.UPID, pbMsg proto.Message) { // networkError is invoked when there's a network-level error communicating with the mesos slave.
// The driver reacts by entering a "disconnected" state and invoking the Executor.Disconnected
// callback. The assumption is that if this driver was previously connected, and then there's a
// network error, then the slave process must be dying/dead. The native driver implementation makes
// this same assumption. I have some concerns that this may be a false-positive in some situations;
// some network errors (timeouts) may be indicative of something other than a dead slave process.
func (driver *MesosExecutorDriver) networkError(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Info("ignoring network error because aborted")
return
}
if driver.connected {
driver.connected = false
msg := pbMsg.(*mesosproto.InternalNetworkError)
session := msg.GetSession()
if session != driver.connection.String() {
log.V(1).Infoln("ignoring netwok error for disconnected/stale session")
return
}
if driver.checkpoint {
log.Infoln("slave disconnected, will wait for recovery")
driver.withExecutor(func(e Executor) { e.Disconnected(driver) })
if driver.recoveryTimer != nil {
driver.recoveryTimer.Stop()
}
t := time.NewTimer(driver.recoveryTimeout)
driver.recoveryTimer = t
go func() {
select {
case <-t.C:
// timer expired
driver.lock.Lock()
defer driver.lock.Unlock()
driver.recoveryTimedOut(session)
case <-driver.stopCh:
// driver stopped
return
}
}()
return
}
}
log.Infoln("slave exited ... shutting down")
driver.withExecutor(func(e Executor) { e.Shutdown(driver) }) // abnormal shutdown
driver.abort()
}
func (driver *MesosExecutorDriver) recoveryTimedOut(connection string) {
if driver.connected {
return
}
// ensure that connection ID's match otherwise we've been re-registered
if connection == driver.connection.String() {
log.Info("recovery timeout of %v exceeded; shutting down", driver.recoveryTimeout)
driver.shutdown(driver.context(), nil, nil)
}
}
func (driver *MesosExecutorDriver) registered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring registration message from slave because aborted")
return
}
log.Infoln("Executor driver registered") log.Infoln("Executor driver registered")
msg := pbMsg.(*mesosproto.ExecutorRegisteredMessage) msg := pbMsg.(*mesosproto.ExecutorRegisteredMessage)
@ -235,7 +322,11 @@ func (driver *MesosExecutorDriver) registered(from *upid.UPID, pbMsg proto.Messa
driver.withExecutor(func(e Executor) { e.Registered(driver, executorInfo, frameworkInfo, slaveInfo) }) driver.withExecutor(func(e Executor) { e.Registered(driver, executorInfo, frameworkInfo, slaveInfo) })
} }
func (driver *MesosExecutorDriver) reregistered(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosExecutorDriver) reregistered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring reregistration message from slave because aborted")
return
}
log.Infoln("Executor driver reregistered") log.Infoln("Executor driver reregistered")
msg := pbMsg.(*mesosproto.ExecutorReregisteredMessage) msg := pbMsg.(*mesosproto.ExecutorReregisteredMessage)
@ -254,11 +345,7 @@ func (driver *MesosExecutorDriver) reregistered(from *upid.UPID, pbMsg proto.Mes
driver.withExecutor(func(e Executor) { e.Reregistered(driver, slaveInfo) }) driver.withExecutor(func(e Executor) { e.Reregistered(driver, slaveInfo) })
} }
func (driver *MesosExecutorDriver) send(upid *upid.UPID, msg proto.Message) error { func (driver *MesosExecutorDriver) send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
//TODO(jdef) should implement timeout here
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
c := make(chan error, 1) c := make(chan error, 1)
go func() { c <- driver.messenger.Send(ctx, upid, msg) }() go func() { c <- driver.messenger.Send(ctx, upid, msg) }()
@ -271,7 +358,11 @@ func (driver *MesosExecutorDriver) send(upid *upid.UPID, msg proto.Message) erro
} }
} }
func (driver *MesosExecutorDriver) reconnect(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosExecutorDriver) reconnect(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring reconnect message from slave because aborted")
return
}
log.Infoln("Executor driver reconnect") log.Infoln("Executor driver reconnect")
msg := pbMsg.(*mesosproto.ReconnectExecutorMessage) msg := pbMsg.(*mesosproto.ReconnectExecutorMessage)
@ -298,12 +389,16 @@ func (driver *MesosExecutorDriver) reconnect(from *upid.UPID, pbMsg proto.Messag
message.Tasks = append(message.Tasks, t) message.Tasks = append(message.Tasks, t)
} }
// Send the message. // Send the message.
if err := driver.send(driver.slaveUPID, message); err != nil { if err := driver.send(ctx, driver.slaveUPID, message); err != nil {
log.Errorf("Failed to send %v: %v\n", message, err) log.Errorf("Failed to send %v: %v\n", message, err)
} }
} }
func (driver *MesosExecutorDriver) runTask(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosExecutorDriver) runTask(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring runTask message from slave because aborted")
return
}
log.Infoln("Executor driver runTask") log.Infoln("Executor driver runTask")
msg := pbMsg.(*mesosproto.RunTaskMessage) msg := pbMsg.(*mesosproto.RunTaskMessage)
@ -323,7 +418,11 @@ func (driver *MesosExecutorDriver) runTask(from *upid.UPID, pbMsg proto.Message)
driver.withExecutor(func(e Executor) { e.LaunchTask(driver, task) }) driver.withExecutor(func(e Executor) { e.LaunchTask(driver, task) })
} }
func (driver *MesosExecutorDriver) killTask(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosExecutorDriver) killTask(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring killTask message from slave because aborted")
return
}
log.Infoln("Executor driver killTask") log.Infoln("Executor driver killTask")
msg := pbMsg.(*mesosproto.KillTaskMessage) msg := pbMsg.(*mesosproto.KillTaskMessage)
@ -338,7 +437,11 @@ func (driver *MesosExecutorDriver) killTask(from *upid.UPID, pbMsg proto.Message
driver.withExecutor(func(e Executor) { e.KillTask(driver, taskID) }) driver.withExecutor(func(e Executor) { e.KillTask(driver, taskID) })
} }
func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring status update ack message because aborted")
return
}
log.Infoln("Executor statusUpdateAcknowledgement") log.Infoln("Executor statusUpdateAcknowledgement")
msg := pbMsg.(*mesosproto.StatusUpdateAcknowledgementMessage) msg := pbMsg.(*mesosproto.StatusUpdateAcknowledgementMessage)
@ -359,7 +462,11 @@ func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(from *upid.UPID,
delete(driver.tasks, taskID.String()) delete(driver.tasks, taskID.String())
} }
func (driver *MesosExecutorDriver) frameworkMessage(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosExecutorDriver) frameworkMessage(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring frameworkMessage message from slave because aborted")
return
}
log.Infoln("Executor driver received frameworkMessage") log.Infoln("Executor driver received frameworkMessage")
msg := pbMsg.(*mesosproto.FrameworkToExecutorMessage) msg := pbMsg.(*mesosproto.FrameworkToExecutorMessage)
@ -374,13 +481,12 @@ func (driver *MesosExecutorDriver) frameworkMessage(from *upid.UPID, pbMsg proto
driver.withExecutor(func(e Executor) { e.FrameworkMessage(driver, string(data)) }) driver.withExecutor(func(e Executor) { e.FrameworkMessage(driver, string(data)) })
} }
func (driver *MesosExecutorDriver) shutdown(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosExecutorDriver) shutdown(_ context.Context, _ *upid.UPID, _ proto.Message) {
log.Infoln("Executor driver received shutdown") if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring shutdown message because aborted")
_, ok := pbMsg.(*mesosproto.ShutdownExecutorMessage) return
if !ok {
panic("Not a ShutdownExecutorMessage! This should not happen")
} }
log.Infoln("Executor driver received shutdown")
if driver.stopped() { if driver.stopped() {
log.Infof("Ignoring shutdown message because the driver is stopped!\n") log.Infof("Ignoring shutdown message because the driver is stopped!\n")
@ -394,7 +500,11 @@ func (driver *MesosExecutorDriver) shutdown(from *upid.UPID, pbMsg proto.Message
driver.stop() driver.stop()
} }
func (driver *MesosExecutorDriver) frameworkError(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosExecutorDriver) frameworkError(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring framework error message because aborted")
return
}
log.Infoln("Executor driver received error") log.Infoln("Executor driver received error")
msg := pbMsg.(*mesosproto.FrameworkErrorMessage) msg := pbMsg.(*mesosproto.FrameworkErrorMessage)
@ -433,7 +543,7 @@ func (driver *MesosExecutorDriver) start() (mesosproto.Status, error) {
ExecutorId: driver.executorID, ExecutorId: driver.executorID,
} }
if err := driver.send(driver.slaveUPID, message); err != nil { if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorf("Stopping the executor, failed to send %v: %v\n", message, err) log.Errorf("Stopping the executor, failed to send %v: %v\n", message, err)
err0 := driver._stop(driver.status) err0 := driver._stop(driver.status)
if err0 != nil { if err0 != nil {
@ -585,7 +695,7 @@ func (driver *MesosExecutorDriver) sendStatusUpdate(taskStatus *mesosproto.TaskS
Pid: proto.String(driver.self.String()), Pid: proto.String(driver.self.String()),
} }
// Send the message. // Send the message.
if err := driver.send(driver.slaveUPID, message); err != nil { if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorf("Failed to send %v: %v\n", message, err) log.Errorf("Failed to send %v: %v\n", message, err)
return driver.status, err return driver.status, err
} }
@ -632,7 +742,7 @@ func (driver *MesosExecutorDriver) sendFrameworkMessage(data string) (mesosproto
} }
// Send the message. // Send the message.
if err := driver.send(driver.slaveUPID, message); err != nil { if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorln("Failed to send message %v: %v", message, err) log.Errorln("Failed to send message %v: %v", message, err)
return driver.status, err return driver.status, err
} }

View File

@ -22,6 +22,7 @@ It has these top-level messages:
InternalMasterChangeDetected InternalMasterChangeDetected
InternalTryAuthentication InternalTryAuthentication
InternalAuthenticationResult InternalAuthenticationResult
InternalNetworkError
FrameworkID FrameworkID
OfferID OfferID
SlaveID SlaveID

View File

@ -75,3 +75,29 @@ func (m *InternalAuthenticationResult) GetPid() string {
} }
return "" return ""
} }
type InternalNetworkError struct {
// master pid that this event pertains to
Pid *string `protobuf:"bytes,1,req,name=pid" json:"pid,omitempty"`
// driver session UUID
Session *string `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *InternalNetworkError) Reset() { *m = InternalNetworkError{} }
func (m *InternalNetworkError) String() string { return proto.CompactTextString(m) }
func (*InternalNetworkError) ProtoMessage() {}
func (m *InternalNetworkError) GetPid() string {
if m != nil && m.Pid != nil {
return *m.Pid
}
return ""
}
func (m *InternalNetworkError) GetSession() string {
if m != nil && m.Session != nil {
return *m.Session
}
return ""
}

View File

@ -21,3 +21,10 @@ message InternalAuthenticationResult {
// master pid that this result pertains to // master pid that this result pertains to
required string pid = 3; required string pid = 3;
} }
message InternalNetworkError {
// master pid that this event pertains to
required string pid = 1;
// driver session UUID
optional string session = 2;
}

View File

@ -20,9 +20,6 @@ import (
) )
const ( const (
DefaultReadTimeout = 5 * time.Second
DefaultWriteTimeout = 5 * time.Second
// writeFlushPeriod is the amount of time we're willing to wait for a single // writeFlushPeriod is the amount of time we're willing to wait for a single
// response buffer to be fully written to the underlying TCP connection; after // response buffer to be fully written to the underlying TCP connection; after
// this amount of time the remaining bytes of the response are discarded. see // this amount of time the remaining bytes of the response are discarded. see
@ -43,13 +40,8 @@ func (did *decoderID) next() decoderID {
var ( var (
errHijackFailed = errors.New("failed to hijack http connection") errHijackFailed = errors.New("failed to hijack http connection")
did decoderID // decoder ID counter did decoderID // decoder ID counter
closedChan = make(chan struct{})
) )
func init() {
close(closedChan)
}
type Decoder interface { type Decoder interface {
Requests() <-chan *Request Requests() <-chan *Request
Err() <-chan error Err() <-chan error
@ -99,8 +91,8 @@ func DecodeHTTP(w http.ResponseWriter, r *http.Request) Decoder {
req: r, req: r,
shouldQuit: make(chan struct{}), shouldQuit: make(chan struct{}),
forceQuit: make(chan struct{}), forceQuit: make(chan struct{}),
readTimeout: DefaultReadTimeout, readTimeout: ReadTimeout,
writeTimeout: DefaultWriteTimeout, writeTimeout: WriteTimeout,
idtag: id.String(), idtag: id.String(),
outCh: make(chan *bytes.Buffer), outCh: make(chan *bytes.Buffer),
} }

View File

@ -20,6 +20,7 @@ package messenger
import ( import (
"bytes" "bytes"
"crypto/tls"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -27,11 +28,9 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
"net/url"
"os" "os"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
log "github.com/golang/glog" log "github.com/golang/glog"
@ -39,30 +38,39 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const (
DefaultReadTimeout = 10 * time.Second
DefaultWriteTimeout = 10 * time.Second
)
var ( var (
ReadTimeout = DefaultReadTimeout
WriteTimeout = DefaultWriteTimeout
discardOnStopError = fmt.Errorf("discarding message because transport is shutting down") discardOnStopError = fmt.Errorf("discarding message because transport is shutting down")
errNotStarted = errors.New("HTTP transport has not been started") errNotStarted = errors.New("HTTP transport has not been started")
errTerminal = errors.New("HTTP transport is terminated") errTerminal = errors.New("HTTP transport is terminated")
errAlreadyRunning = errors.New("HTTP transport is already running") errAlreadyRunning = errors.New("HTTP transport is already running")
httpTransport, httpClient = &http.Transport{ httpTransport = http.Transport{
Dial: (&net.Dialer{ Dial: (&net.Dialer{
Timeout: 10 * time.Second, Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second, KeepAlive: 30 * time.Second,
}).Dial, }).Dial,
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 10 * time.Second,
}, ResponseHeaderTimeout: DefaultReadTimeout,
&http.Client{ }
Transport: httpTransport,
Timeout: DefaultReadTimeout, // HttpClient is used for sending messages to remote processes
} HttpClient = http.Client{
Timeout: DefaultReadTimeout,
}
) )
// httpTransporter is a subset of the Transporter interface // httpTransporter is a subset of the Transporter interface
type httpTransporter interface { type httpTransporter interface {
Send(ctx context.Context, msg *Message) error Send(ctx context.Context, msg *Message) error
Recv() (*Message, error) Recv() (*Message, error)
Inject(ctx context.Context, msg *Message) error
Install(messageName string) Install(messageName string)
Start() (upid.UPID, <-chan error) Start() (upid.UPID, <-chan error)
Stop(graceful bool) error Stop(graceful bool) error
@ -80,11 +88,10 @@ type runningState struct {
/* -- not-started state */ /* -- not-started state */
func (s *notStartedState) Send(ctx context.Context, msg *Message) error { return errNotStarted } func (s *notStartedState) Send(ctx context.Context, msg *Message) error { return errNotStarted }
func (s *notStartedState) Recv() (*Message, error) { return nil, errNotStarted } func (s *notStartedState) Recv() (*Message, error) { return nil, errNotStarted }
func (s *notStartedState) Inject(ctx context.Context, msg *Message) error { return errNotStarted } func (s *notStartedState) Stop(graceful bool) error { return errNotStarted }
func (s *notStartedState) Stop(graceful bool) error { return errNotStarted } func (s *notStartedState) Install(messageName string) { s.h.install(messageName) }
func (s *notStartedState) Install(messageName string) { s.h.install(messageName) }
func (s *notStartedState) Start() (upid.UPID, <-chan error) { func (s *notStartedState) Start() (upid.UPID, <-chan error) {
s.h.state = &runningState{s} s.h.state = &runningState{s}
return s.h.start() return s.h.start()
@ -92,11 +99,10 @@ func (s *notStartedState) Start() (upid.UPID, <-chan error) {
/* -- stopped state */ /* -- stopped state */
func (s *stoppedState) Send(ctx context.Context, msg *Message) error { return errTerminal } func (s *stoppedState) Send(ctx context.Context, msg *Message) error { return errTerminal }
func (s *stoppedState) Recv() (*Message, error) { return nil, errTerminal } func (s *stoppedState) Recv() (*Message, error) { return nil, errTerminal }
func (s *stoppedState) Inject(ctx context.Context, msg *Message) error { return errTerminal } func (s *stoppedState) Stop(graceful bool) error { return errTerminal }
func (s *stoppedState) Stop(graceful bool) error { return errTerminal } func (s *stoppedState) Install(messageName string) {}
func (s *stoppedState) Install(messageName string) {}
func (s *stoppedState) Start() (upid.UPID, <-chan error) { func (s *stoppedState) Start() (upid.UPID, <-chan error) {
ch := make(chan error, 1) ch := make(chan error, 1)
ch <- errTerminal ch <- errTerminal
@ -105,9 +111,8 @@ func (s *stoppedState) Start() (upid.UPID, <-chan error) {
/* -- running state */ /* -- running state */
func (s *runningState) Send(ctx context.Context, msg *Message) error { return s.h.send(ctx, msg) } func (s *runningState) Send(ctx context.Context, msg *Message) error { return s.h.send(ctx, msg) }
func (s *runningState) Recv() (*Message, error) { return s.h.recv() } func (s *runningState) Recv() (*Message, error) { return s.h.recv() }
func (s *runningState) Inject(ctx context.Context, msg *Message) error { return s.h.inject(ctx, msg) }
func (s *runningState) Stop(graceful bool) error { func (s *runningState) Stop(graceful bool) error {
s.h.state = &stoppedState{} s.h.state = &stoppedState{}
return s.h.stop(graceful) return s.h.stop(graceful)
@ -118,6 +123,9 @@ func (s *runningState) Start() (upid.UPID, <-chan error) {
return upid.UPID{}, ch return upid.UPID{}, ch
} }
// httpOpt is a functional option type
type httpOpt func(*HTTPTransporter)
// HTTPTransporter implements the interfaces of the Transporter. // HTTPTransporter implements the interfaces of the Transporter.
type HTTPTransporter struct { type HTTPTransporter struct {
// If the host is empty("") then it will listen on localhost. // If the host is empty("") then it will listen on localhost.
@ -132,118 +140,110 @@ type HTTPTransporter struct {
shouldQuit chan struct{} shouldQuit chan struct{}
stateLock sync.RWMutex // protect lifecycle (start/stop) funcs stateLock sync.RWMutex // protect lifecycle (start/stop) funcs
state httpTransporter state httpTransporter
server *http.Server
} }
// NewHTTPTransporter creates a new http transporter with an optional binding address. // NewHTTPTransporter creates a new http transporter with an optional binding address.
func NewHTTPTransporter(upid upid.UPID, address net.IP) *HTTPTransporter { func NewHTTPTransporter(upid upid.UPID, address net.IP, opts ...httpOpt) *HTTPTransporter {
transport := httpTransport
client := HttpClient
client.Transport = &transport
mux := http.NewServeMux()
result := &HTTPTransporter{ result := &HTTPTransporter{
upid: upid, upid: upid,
messageQueue: make(chan *Message, defaultQueueSize), messageQueue: make(chan *Message, defaultQueueSize),
mux: http.NewServeMux(), mux: mux,
client: httpClient, client: &client,
tr: httpTransport, tr: &transport,
address: address, address: address,
shouldQuit: make(chan struct{}), shouldQuit: make(chan struct{}),
server: &http.Server{
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
Handler: mux,
},
}
for _, f := range opts {
f(result)
} }
result.state = &notStartedState{result} result.state = &notStartedState{result}
return result return result
} }
func ServerTLSConfig(config *tls.Config, nextProto map[string]func(*http.Server, *tls.Conn, http.Handler)) httpOpt {
return func(transport *HTTPTransporter) {
transport.server.TLSConfig = config
transport.server.TLSNextProto = nextProto
}
}
func ClientTLSConfig(config *tls.Config, handshakeTimeout time.Duration) httpOpt {
return func(transport *HTTPTransporter) {
transport.tr.TLSClientConfig = config
transport.tr.TLSHandshakeTimeout = handshakeTimeout
}
}
func (t *HTTPTransporter) getState() httpTransporter { func (t *HTTPTransporter) getState() httpTransporter {
t.stateLock.RLock() t.stateLock.RLock()
defer t.stateLock.RUnlock() defer t.stateLock.RUnlock()
return t.state return t.state
} }
// some network errors are probably recoverable, attempt to determine that here.
func isRecoverableError(err error) bool {
if urlErr, ok := err.(*url.Error); ok {
log.V(2).Infof("checking url.Error for recoverability")
return urlErr.Op == "Post" && isRecoverableError(urlErr.Err)
} else if netErr, ok := err.(*net.OpError); ok && netErr.Err != nil {
log.V(2).Infof("checking net.OpError for recoverability: %#v", err)
if netErr.Temporary() {
return true
}
//TODO(jdef) this is pretty hackish, there's probably a better way
//TODO(jdef) should also check for EHOSTDOWN and EHOSTUNREACH
return (netErr.Op == "dial" && netErr.Net == "tcp" && netErr.Err == syscall.ECONNREFUSED)
}
log.V(2).Infof("unrecoverable error: %#v", err)
return false
}
type recoverableError struct {
Err error
}
func (e *recoverableError) Error() string {
if e == nil {
return ""
}
return e.Err.Error()
}
// Send sends the message to its specified upid. // Send sends the message to its specified upid.
func (t *HTTPTransporter) Send(ctx context.Context, msg *Message) (sendError error) { func (t *HTTPTransporter) Send(ctx context.Context, msg *Message) (sendError error) {
return t.getState().Send(ctx, msg) return t.getState().Send(ctx, msg)
} }
type mesosError struct {
errorCode int
upid string
uri string
status string
}
func (e *mesosError) Error() string {
return fmt.Sprintf("master %s rejected %s, returned status %q",
e.upid, e.uri, e.status)
}
type networkError struct {
cause error
}
func (e *networkError) Error() string {
return e.cause.Error()
}
// send delivers a message to a mesos component via HTTP, returns a mesosError if the
// communication with the remote process was successful but rejected. A networkError
// error indicates that communication with the remote process failed at the network layer.
func (t *HTTPTransporter) send(ctx context.Context, msg *Message) (sendError error) { func (t *HTTPTransporter) send(ctx context.Context, msg *Message) (sendError error) {
log.V(2).Infof("Sending message to %v via http\n", msg.UPID) log.V(2).Infof("Sending message to %v via http\n", msg.UPID)
req, err := t.makeLibprocessRequest(msg) req, err := t.makeLibprocessRequest(msg)
if err != nil { if err != nil {
log.Errorf("Failed to make libprocess request: %v\n", err)
return err return err
} }
duration := 1 * time.Second
for attempt := 0; attempt < 5; attempt++ { //TODO(jdef) extract/parameterize constant
if sendError != nil {
duration *= 2
log.Warningf("attempting to recover from error '%v', waiting before retry: %v", sendError, duration)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(duration):
// ..retry request, continue
case <-t.shouldQuit:
return discardOnStopError
}
}
sendError = t.httpDo(ctx, req, func(resp *http.Response, err error) error {
if err != nil {
if isRecoverableError(err) {
return &recoverableError{Err: err}
}
log.Infof("Failed to POST: %v\n", err)
return err
}
defer resp.Body.Close()
// ensure master acknowledgement. return t.httpDo(ctx, req, func(resp *http.Response, err error) error {
if (resp.StatusCode != http.StatusOK) && if err != nil {
(resp.StatusCode != http.StatusAccepted) { log.V(1).Infof("Failed to POST: %v\n", err)
msg := fmt.Sprintf("Master %s rejected %s. Returned status %s.", return &networkError{err}
msg.UPID, msg.RequestURI(), resp.Status)
log.Warning(msg)
return fmt.Errorf(msg)
}
return nil
})
if sendError == nil {
// success
return
} else if _, ok := sendError.(*recoverableError); ok {
// recoverable, attempt backoff?
continue
} }
// unrecoverable defer resp.Body.Close()
break
} // ensure master acknowledgement.
if recoverable, ok := sendError.(*recoverableError); ok { if (resp.StatusCode != http.StatusOK) && (resp.StatusCode != http.StatusAccepted) {
sendError = recoverable.Err return &mesosError{
} errorCode: resp.StatusCode,
return upid: msg.UPID.String(),
uri: msg.RequestURI(),
status: resp.Status,
}
}
return nil
})
} }
func (t *HTTPTransporter) httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error { func (t *HTTPTransporter) httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
@ -289,30 +289,6 @@ func (t *HTTPTransporter) recv() (*Message, error) {
return nil, discardOnStopError return nil, discardOnStopError
} }
//Inject places a message into the incoming message queue.
func (t *HTTPTransporter) Inject(ctx context.Context, msg *Message) error {
return t.getState().Inject(ctx, msg)
}
func (t *HTTPTransporter) inject(ctx context.Context, msg *Message) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.shouldQuit:
return discardOnStopError
default: // continue
}
select {
case t.messageQueue <- msg:
return nil
case <-ctx.Done():
return ctx.Err()
case <-t.shouldQuit:
return discardOnStopError
}
}
// Install the request URI according to the message's name. // Install the request URI according to the message's name.
func (t *HTTPTransporter) Install(msgName string) { func (t *HTTPTransporter) Install(msgName string) {
t.getState().Install(msgName) t.getState().Install(msgName)
@ -439,12 +415,7 @@ func (t *HTTPTransporter) start() (upid.UPID, <-chan error) {
// TODO(yifan): Set read/write deadline. // TODO(yifan): Set read/write deadline.
go func() { go func() {
s := &http.Server{ err := t.server.Serve(t.listener)
ReadTimeout: DefaultReadTimeout,
WriteTimeout: DefaultWriteTimeout,
Handler: t.mux,
}
err := s.Serve(t.listener)
select { select {
case <-t.shouldQuit: case <-t.shouldQuit:
log.V(1).Infof("HTTP server stopped because of shutdown") log.V(1).Infof("HTTP server stopped because of shutdown")
@ -587,7 +558,7 @@ func (t *HTTPTransporter) makeLibprocessRequest(msg *Message) (*http.Request, er
log.V(2).Infof("libproc target URL %s", targetURL) log.V(2).Infof("libproc target URL %s", targetURL)
req, err := http.NewRequest("POST", targetURL, bytes.NewReader(msg.Bytes)) req, err := http.NewRequest("POST", targetURL, bytes.NewReader(msg.Bytes))
if err != nil { if err != nil {
log.Errorf("Failed to create request: %v\n", err) log.V(1).Infof("Failed to create request: %v\n", err)
return nil, err return nil, err
} }
if !msg.isV1API() { if !msg.isV1API() {

View File

@ -30,6 +30,7 @@ import (
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosproto/scheduler" "github.com/mesos/mesos-go/mesosproto/scheduler"
"github.com/mesos/mesos-go/mesosutil/process" "github.com/mesos/mesos-go/mesosutil/process"
"github.com/mesos/mesos-go/messenger/sessionid"
"github.com/mesos/mesos-go/upid" "github.com/mesos/mesos-go/upid"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -52,16 +53,19 @@ type Messenger interface {
UPID() upid.UPID UPID() upid.UPID
} }
type errorHandlerFunc func(context.Context, *Message, error) error
type dispatchFunc func(errorHandlerFunc)
// MesosMessenger is an implementation of the Messenger interface. // MesosMessenger is an implementation of the Messenger interface.
type MesosMessenger struct { type MesosMessenger struct {
upid upid.UPID upid upid.UPID
encodingQueue chan *Message sendingQueue chan dispatchFunc
sendingQueue chan *Message
installedMessages map[string]reflect.Type installedMessages map[string]reflect.Type
installedHandlers map[string]MessageHandler installedHandlers map[string]MessageHandler
stop chan struct{} stop chan struct{}
stopOnce sync.Once stopOnce sync.Once
tr Transporter tr Transporter
guardHandlers sync.RWMutex // protect simultaneous changes to messages/handlers maps
} }
// ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to // ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to
@ -137,18 +141,17 @@ func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error)
} }
// NewMesosMessenger creates a new mesos messenger. // NewMesosMessenger creates a new mesos messenger.
func NewHttp(upid upid.UPID) *MesosMessenger { func NewHttp(upid upid.UPID, opts ...httpOpt) *MesosMessenger {
return NewHttpWithBindingAddress(upid, nil) return NewHttpWithBindingAddress(upid, nil, opts...)
} }
func NewHttpWithBindingAddress(upid upid.UPID, address net.IP) *MesosMessenger { func NewHttpWithBindingAddress(upid upid.UPID, address net.IP, opts ...httpOpt) *MesosMessenger {
return New(upid, NewHTTPTransporter(upid, address)) return New(NewHTTPTransporter(upid, address, opts...))
} }
func New(upid upid.UPID, t Transporter) *MesosMessenger { func New(t Transporter) *MesosMessenger {
return &MesosMessenger{ return &MesosMessenger{
encodingQueue: make(chan *Message, defaultQueueSize), sendingQueue: make(chan dispatchFunc, defaultQueueSize),
sendingQueue: make(chan *Message, defaultQueueSize),
installedMessages: make(map[string]reflect.Type), installedMessages: make(map[string]reflect.Type),
installedHandlers: make(map[string]MessageHandler), installedHandlers: make(map[string]MessageHandler),
tr: t, tr: t,
@ -168,6 +171,10 @@ func (m *MesosMessenger) Install(handler MessageHandler, msg proto.Message) erro
if _, ok := m.installedMessages[name]; ok { if _, ok := m.installedMessages[name]; ok {
return fmt.Errorf("Message %v is already installed", name) return fmt.Errorf("Message %v is already installed", name)
} }
m.guardHandlers.Lock()
defer m.guardHandlers.Unlock()
m.installedMessages[name] = mtype.Elem() m.installedMessages[name] = mtype.Elem()
m.installedHandlers[name] = handler m.installedHandlers[name] = handler
m.tr.Install(name) m.tr.Install(name)
@ -184,12 +191,27 @@ func (m *MesosMessenger) Send(ctx context.Context, upid *upid.UPID, msg proto.Me
} else if *upid == m.upid { } else if *upid == m.upid {
return fmt.Errorf("Send the message to self") return fmt.Errorf("Send the message to self")
} }
b, err := proto.Marshal(msg)
if err != nil {
return err
}
name := getMessageName(msg) name := getMessageName(msg)
log.V(2).Infof("Sending message %v to %v\n", name, upid) log.V(2).Infof("Sending message %v to %v\n", name, upid)
wrapped := &Message{upid, name, msg, b}
d := dispatchFunc(func(rf errorHandlerFunc) {
err := m.tr.Send(ctx, wrapped)
err = rf(ctx, wrapped, err)
if err != nil {
m.reportError("send", wrapped, err)
}
})
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case m.encodingQueue <- &Message{upid, name, msg, nil}: case m.sendingQueue <- d:
return nil return nil
} }
} }
@ -206,14 +228,18 @@ func (m *MesosMessenger) Route(ctx context.Context, upid *upid.UPID, msg proto.M
return m.Send(ctx, upid, msg) return m.Send(ctx, upid, msg)
} }
// TODO(jdef) this has an unfortunate performance impact for self-messaging. implement
// something more reasonable here.
data, err := proto.Marshal(msg)
if err != nil {
return err
}
name := getMessageName(msg) name := getMessageName(msg)
return m.tr.Inject(ctx, &Message{upid, name, msg, data}) log.V(2).Infof("routing message %q to self", name)
_, handler, ok := m.messageBinding(name)
if !ok {
return fmt.Errorf("failed to route message, no message binding for %q", name)
}
// the implication of this is that messages can be delivered to self even if the
// messenger has been stopped. is that OK?
go handler(upid, msg)
return nil
} }
// Start starts the messenger; expects to be called once and only once. // Start starts the messenger; expects to be called once and only once.
@ -231,7 +257,6 @@ func (m *MesosMessenger) Start() error {
m.upid = pid m.upid = pid
go m.sendLoop() go m.sendLoop()
go m.encodeLoop()
go m.decodeLoop() go m.decodeLoop()
// wait for a listener error or a stop signal; either way stop the messenger // wait for a listener error or a stop signal; either way stop the messenger
@ -267,7 +292,7 @@ func (m *MesosMessenger) Stop() (err error) {
defer close(m.stop) defer close(m.stop)
} }
log.Info("stopping messenger..") log.Infof("stopping messenger %v..", m.upid)
//TODO(jdef) don't hardcode the graceful flag here //TODO(jdef) don't hardcode the graceful flag here
if err2 := m.tr.Stop(true); err2 != nil && err2 != errTerminal { if err2 := m.tr.Stop(true); err2 != nil && err2 != errTerminal {
@ -283,53 +308,14 @@ func (m *MesosMessenger) UPID() upid.UPID {
return m.upid return m.upid
} }
func (m *MesosMessenger) encodeLoop() { func (m *MesosMessenger) reportError(action string, msg *Message, err error) {
for { // log message transmission errors but don't shoot the messenger.
select { // this approach essentially drops all undelivered messages on the floor.
case <-m.stop: name := ""
return if msg != nil {
case msg := <-m.encodingQueue: name = msg.Name
e := func() error {
//TODO(jdef) implement timeout for context
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
b, err := proto.Marshal(msg.ProtoMessage)
if err != nil {
return err
}
msg.Bytes = b
select {
case <-ctx.Done():
return ctx.Err()
case m.sendingQueue <- msg:
return nil
}
}()
if e != nil {
m.reportError(fmt.Errorf("Failed to enqueue message %v: %v", msg, e))
}
}
}
}
func (m *MesosMessenger) reportError(err error) {
log.V(2).Info(err)
//TODO(jdef) implement timeout for context
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
c := make(chan error, 1)
pid := m.upid
go func() { c <- m.Route(ctx, &pid, &mesos.FrameworkErrorMessage{Message: proto.String(err.Error())}) }()
select {
case <-ctx.Done():
<-c // wait for Route to return
case e := <-c:
if e != nil {
log.Errorf("failed to report error %v due to: %v", err, e)
}
} }
log.Errorf("failed to %s message %q: %+v", action, name, err)
} }
func (m *MesosMessenger) sendLoop() { func (m *MesosMessenger) sendLoop() {
@ -337,27 +323,28 @@ func (m *MesosMessenger) sendLoop() {
select { select {
case <-m.stop: case <-m.stop:
return return
case msg := <-m.sendingQueue: case f := <-m.sendingQueue:
e := func() error { f(errorHandlerFunc(func(ctx context.Context, msg *Message, err error) error {
//TODO(jdef) implement timeout for context if _, ok := err.(*networkError); ok {
ctx, cancel := context.WithCancel(context.TODO()) // if transport reports a network error, then
defer cancel() // we're probably disconnected from the remote process?
pid := msg.UPID.String()
c := make(chan error, 1) neterr := &mesos.InternalNetworkError{Pid: &pid}
go func() { c <- m.tr.Send(ctx, msg) }() sessionID, ok := sessionid.FromContext(ctx)
if ok {
select { neterr.Session = &sessionID
case <-ctx.Done(): }
// Transport layer must use the context to detect cancelled requests. log.V(1).Infof("routing network error for pid %q session %q", pid, sessionID)
<-c // wait for Send to return err2 := m.Route(ctx, &m.upid, neterr)
return ctx.Err() if err2 != nil {
case err := <-c: log.Error(err2)
return err } else {
log.V(1).Infof("swallowing raw error because we're reporting a networkError: %v", err)
return nil
}
} }
}() return err
if e != nil { }))
m.reportError(fmt.Errorf("Failed to send message %v: %v", msg.Name, e))
}
} }
} }
} }
@ -379,17 +366,42 @@ func (m *MesosMessenger) decodeLoop() {
panic(fmt.Sprintf("unexpected transport error: %v", err)) panic(fmt.Sprintf("unexpected transport error: %v", err))
} }
} }
log.V(2).Infof("Receiving message %v from %v\n", msg.Name, msg.UPID) log.V(2).Infof("Receiving message %v from %v\n", msg.Name, msg.UPID)
msg.ProtoMessage = reflect.New(m.installedMessages[msg.Name]).Interface().(proto.Message) protoMessage, handler, found := m.messageBinding(msg.Name)
if !found {
log.Warningf("no message binding for message %q", msg.Name)
continue
}
msg.ProtoMessage = protoMessage
if err := proto.Unmarshal(msg.Bytes, msg.ProtoMessage); err != nil { if err := proto.Unmarshal(msg.Bytes, msg.ProtoMessage); err != nil {
log.Errorf("Failed to unmarshal message %v: %v\n", msg, err) log.Errorf("Failed to unmarshal message %v: %v\n", msg, err)
continue continue
} }
// TODO(yifan): Catch panic.
m.installedHandlers[msg.Name](msg.UPID, msg.ProtoMessage) handler(msg.UPID, msg.ProtoMessage)
} }
} }
func (m *MesosMessenger) messageBinding(name string) (proto.Message, MessageHandler, bool) {
m.guardHandlers.RLock()
defer m.guardHandlers.RUnlock()
gotype, ok := m.installedMessages[name]
if !ok {
return nil, nil, false
}
handler, ok := m.installedHandlers[name]
if !ok {
return nil, nil, false
}
protoMessage := reflect.New(gotype).Interface().(proto.Message)
return protoMessage, handler, true
}
// getMessageName returns the name of the message in the mesos manner. // getMessageName returns the name of the message in the mesos manner.
func getMessageName(msg proto.Message) string { func getMessageName(msg proto.Message) string {
var msgName string var msgName string

View File

@ -0,0 +1,18 @@
package sessionid
import (
"golang.org/x/net/context"
)
type key int
const sessionIDKey = 0
func NewContext(ctx context.Context, sessionID string) context.Context {
return context.WithValue(ctx, sessionIDKey, sessionID)
}
func FromContext(ctx context.Context) (string, bool) {
sessionID, ok := ctx.Value(sessionIDKey).(string)
return sessionID, ok
}

View File

@ -33,11 +33,6 @@ type Transporter interface {
//Will stop receiving when transport is stopped. //Will stop receiving when transport is stopped.
Recv() (*Message, error) Recv() (*Message, error)
//Inject injects a message to the incoming queue. Must use context to
//determine cancelled requests. Injection is aborted if the transport
//is stopped.
Inject(ctx context.Context, msg *Message) error
//Install mount an handler based on incoming message name. //Install mount an handler based on incoming message name.
Install(messageName string) Install(messageName string)

View File

@ -38,19 +38,21 @@ import (
util "github.com/mesos/mesos-go/mesosutil" util "github.com/mesos/mesos-go/mesosutil"
"github.com/mesos/mesos-go/mesosutil/process" "github.com/mesos/mesos-go/mesosutil/process"
"github.com/mesos/mesos-go/messenger" "github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/messenger/sessionid"
"github.com/mesos/mesos-go/upid" "github.com/mesos/mesos-go/upid"
"github.com/pborman/uuid" "github.com/pborman/uuid"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const ( const (
authTimeout = 5 * time.Second // timeout interval for an authentication attempt defaultAuthenticationTimeout = 30 * time.Second // timeout interval for an authentication attempt
registrationRetryIntervalMax = float64(1 * time.Minute) registrationRetryIntervalMax = float64(1 * time.Minute)
registrationBackoffFactor = 2 * time.Second registrationBackoffFactor = 2 * time.Second
) )
var ( var (
authenticationCanceledError = errors.New("authentication canceled") ErrDisconnected = errors.New("disconnected from mesos master")
errAuthenticationCanceled = errors.New("authentication canceled")
) )
type ErrDriverAborted struct { type ErrDriverAborted struct {
@ -147,7 +149,7 @@ type MesosSchedulerDriver struct {
dispatch func(context.Context, *upid.UPID, proto.Message) error // send a message somewhere dispatch func(context.Context, *upid.UPID, proto.Message) error // send a message somewhere
started chan struct{} // signal chan that closes upon a successful call to Start() started chan struct{} // signal chan that closes upon a successful call to Start()
eventLock sync.RWMutex // guard for all driver state eventLock sync.RWMutex // guard for all driver state
withScheduler func(f func(s Scheduler)) // execute some func with respect to the given scheduler withScheduler func(f func(s Scheduler)) // execute some func with respect to the given scheduler; should be the last thing invoked in a handler (lock semantics)
done chan struct{} // signal chan that closes when no more events will be processed done chan struct{} // signal chan that closes when no more events will be processed
} }
@ -319,17 +321,27 @@ func (driver *MesosSchedulerDriver) makeWithScheduler(cs Scheduler) func(func(Sc
} }
} }
// ctx returns the current context.Context for the driver, expects to be invoked
// only when eventLock is locked.
func (driver *MesosSchedulerDriver) context() context.Context {
// set a "session" attribute so that the messenger can see it
// and use it for reporting delivery errors.
return sessionid.NewContext(context.TODO(), driver.connection.String())
}
// init initializes the driver. // init initializes the driver.
func (driver *MesosSchedulerDriver) init() error { func (driver *MesosSchedulerDriver) init() error {
log.Infof("Initializing mesos scheduler driver\n") log.Infof("Initializing mesos scheduler driver\n")
driver.dispatch = driver.messenger.Send driver.dispatch = driver.messenger.Send
// serialize all callbacks from the messenger // serialize all callbacks from the messenger
guarded := func(h messenger.MessageHandler) messenger.MessageHandler { type messageHandler func(context.Context, *upid.UPID, proto.Message)
guarded := func(h messageHandler) messenger.MessageHandler {
return messenger.MessageHandler(func(from *upid.UPID, msg proto.Message) { return messenger.MessageHandler(func(from *upid.UPID, msg proto.Message) {
driver.eventLock.Lock() driver.eventLock.Lock()
defer driver.eventLock.Unlock() defer driver.eventLock.Unlock()
h(from, msg) h(driver.context(), from, msg)
}) })
} }
@ -345,11 +357,41 @@ func (driver *MesosSchedulerDriver) init() error {
driver.messenger.Install(guarded(driver.exitedExecutor), &mesos.ExitedExecutorMessage{}) driver.messenger.Install(guarded(driver.exitedExecutor), &mesos.ExitedExecutorMessage{})
driver.messenger.Install(guarded(driver.handleMasterChanged), &mesos.InternalMasterChangeDetected{}) driver.messenger.Install(guarded(driver.handleMasterChanged), &mesos.InternalMasterChangeDetected{})
driver.messenger.Install(guarded(driver.handleAuthenticationResult), &mesos.InternalAuthenticationResult{}) driver.messenger.Install(guarded(driver.handleAuthenticationResult), &mesos.InternalAuthenticationResult{})
driver.messenger.Install(guarded(driver.handleNetworkError), &mesos.InternalNetworkError{})
return nil return nil
} }
func (driver *MesosSchedulerDriver) handleNetworkError(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
msg := pbMsg.(*mesos.InternalNetworkError)
if driver.status == mesos.Status_DRIVER_ABORTED {
log.Info("ignoring network error because the driver is aborted.")
return
} else if !from.Equal(driver.self) {
log.Errorf("ignoring network error because message received from upid '%v'", from)
return
} else if !driver.connected {
log.V(1).Infof("ignoring network error since we're not currently connected")
return
}
if driver.masterPid.String() == msg.GetPid() && driver.connection.String() == msg.GetSession() {
// fire a disconnection event
log.V(3).Info("Disconnecting scheduler.")
// need to set all 3 of these at once, since withScheduler() temporarily releases the lock and we don't
// want inconsistent connection facts
driver.masterPid = nil
driver.connected = false
driver.authenticated = false
driver.withScheduler(func(s Scheduler) { s.Disconnected(driver) })
log.Info("master disconnected")
}
}
// lead master detection callback. // lead master detection callback.
func (driver *MesosSchedulerDriver) handleMasterChanged(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) handleMasterChanged(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesos.Status_DRIVER_ABORTED { if driver.status == mesos.Status_DRIVER_ABORTED {
log.Info("Ignoring master change because the driver is aborted.") log.Info("Ignoring master change because the driver is aborted.")
return return
@ -359,18 +401,20 @@ func (driver *MesosSchedulerDriver) handleMasterChanged(from *upid.UPID, pbMsg p
} }
// Reconnect every time a master is detected. // Reconnect every time a master is detected.
if driver.connected { wasConnected := driver.connected
driver.connected = false
driver.authenticated = false
alertScheduler := false
if wasConnected {
log.V(3).Info("Disconnecting scheduler.") log.V(3).Info("Disconnecting scheduler.")
driver.masterPid = nil driver.masterPid = nil
driver.withScheduler(func(s Scheduler) { s.Disconnected(driver) }) alertScheduler = true
} }
msg := pbMsg.(*mesos.InternalMasterChangeDetected) msg := pbMsg.(*mesos.InternalMasterChangeDetected)
master := msg.Master master := msg.Master
driver.connected = false
driver.authenticated = false
if master != nil { if master != nil {
log.Infof("New master %s detected\n", master.GetPid()) log.Infof("New master %s detected\n", master.GetPid())
@ -380,10 +424,13 @@ func (driver *MesosSchedulerDriver) handleMasterChanged(from *upid.UPID, pbMsg p
} }
driver.masterPid = pid // save for downstream ops. driver.masterPid = pid // save for downstream ops.
driver.tryAuthentication() defer driver.tryAuthentication()
} else { } else {
log.Infoln("No master detected.") log.Infoln("No master detected.")
} }
if alertScheduler {
driver.withScheduler(func(s Scheduler) { s.Disconnected(driver) })
}
} }
// tryAuthentication expects to be guarded by eventLock // tryAuthentication expects to be guarded by eventLock
@ -439,7 +486,7 @@ func (driver *MesosSchedulerDriver) tryAuthentication() {
} }
} }
func (driver *MesosSchedulerDriver) handleAuthenticationResult(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) handleAuthenticationResult(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status != mesos.Status_DRIVER_RUNNING { if driver.status != mesos.Status_DRIVER_RUNNING {
log.V(1).Info("ignoring authenticate because driver is not running") log.V(1).Info("ignoring authenticate because driver is not running")
return return
@ -506,7 +553,7 @@ func (driver *MesosSchedulerDriver) stopped() bool {
} }
// ---------------------- Handlers for Events from Master --------------- // // ---------------------- Handlers for Events from Master --------------- //
func (driver *MesosSchedulerDriver) frameworkRegistered(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) frameworkRegistered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(2).Infoln("Handling scheduler driver framework registered event.") log.V(2).Infoln("Handling scheduler driver framework registered event.")
msg := pbMsg.(*mesos.FrameworkRegisteredMessage) msg := pbMsg.(*mesos.FrameworkRegisteredMessage)
@ -542,7 +589,7 @@ func (driver *MesosSchedulerDriver) frameworkRegistered(from *upid.UPID, pbMsg p
driver.withScheduler(func(s Scheduler) { s.Registered(driver, frameworkId, masterInfo) }) driver.withScheduler(func(s Scheduler) { s.Registered(driver, frameworkId, masterInfo) })
} }
func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) frameworkReregistered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling Scheduler re-registered event.") log.V(1).Infoln("Handling Scheduler re-registered event.")
msg := pbMsg.(*mesos.FrameworkReregisteredMessage) msg := pbMsg.(*mesos.FrameworkReregisteredMessage)
@ -566,10 +613,9 @@ func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg
driver.connection = uuid.NewUUID() driver.connection = uuid.NewUUID()
driver.withScheduler(func(s Scheduler) { s.Reregistered(driver, msg.GetMasterInfo()) }) driver.withScheduler(func(s Scheduler) { s.Reregistered(driver, msg.GetMasterInfo()) })
} }
func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) resourcesOffered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(2).Infoln("Handling resource offers.") log.V(2).Infoln("Handling resource offers.")
msg := pbMsg.(*mesos.ResourceOffersMessage) msg := pbMsg.(*mesos.ResourceOffersMessage)
@ -601,7 +647,7 @@ func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg prot
driver.withScheduler(func(s Scheduler) { s.ResourceOffers(driver, msg.Offers) }) driver.withScheduler(func(s Scheduler) { s.ResourceOffers(driver, msg.Offers) })
} }
func (driver *MesosSchedulerDriver) resourceOfferRescinded(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) resourceOfferRescinded(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling resource offer rescinded.") log.V(1).Infoln("Handling resource offer rescinded.")
msg := pbMsg.(*mesos.RescindResourceOfferMessage) msg := pbMsg.(*mesos.RescindResourceOfferMessage)
@ -623,11 +669,7 @@ func (driver *MesosSchedulerDriver) resourceOfferRescinded(from *upid.UPID, pbMs
driver.withScheduler(func(s Scheduler) { s.OfferRescinded(driver, msg.OfferId) }) driver.withScheduler(func(s Scheduler) { s.OfferRescinded(driver, msg.OfferId) })
} }
func (driver *MesosSchedulerDriver) send(upid *upid.UPID, msg proto.Message) error { func (driver *MesosSchedulerDriver) send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
//TODO(jdef) should implement timeout here
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
c := make(chan error, 1) c := make(chan error, 1)
go func() { c <- driver.dispatch(ctx, upid, msg) }() go func() { c <- driver.dispatch(ctx, upid, msg) }()
@ -641,7 +683,7 @@ func (driver *MesosSchedulerDriver) send(upid *upid.UPID, msg proto.Message) err
} }
// statusUpdated expects to be guarded by eventLock // statusUpdated expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) statusUpdated(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) statusUpdated(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
msg := pbMsg.(*mesos.StatusUpdateMessage) msg := pbMsg.(*mesos.StatusUpdateMessage)
if driver.status != mesos.Status_DRIVER_RUNNING { if driver.status != mesos.Status_DRIVER_RUNNING {
@ -679,35 +721,33 @@ func (driver *MesosSchedulerDriver) statusUpdated(from *upid.UPID, pbMsg proto.M
status.Uuid = msg.Update.Uuid status.Uuid = msg.Update.Uuid
} }
driver.withScheduler(func(s Scheduler) { s.StatusUpdate(driver, status) })
if driver.status == mesos.Status_DRIVER_ABORTED { if driver.status == mesos.Status_DRIVER_ABORTED {
log.V(1).Infoln("Not sending StatusUpdate ACK, the driver is aborted!") log.V(1).Infoln("Not sending StatusUpdate ACK, the driver is aborted!")
return
}
// Send StatusUpdate Acknowledgement; see above for the rules.
// Only send ACK if udpate was not from this driver and spec'd a UUID; this is compat w/ 0.23+
ackRequired := len(msg.Update.Uuid) > 0 && !from.Equal(driver.self) && msg.GetPid() != driver.self.String()
if ackRequired {
ackMsg := &mesos.StatusUpdateAcknowledgementMessage{
SlaveId: msg.Update.SlaveId,
FrameworkId: driver.frameworkInfo.Id,
TaskId: msg.Update.Status.TaskId,
Uuid: msg.Update.Uuid,
}
log.V(2).Infof("Sending ACK for status update %+v to %q", *msg.Update, from.String())
if err := driver.send(driver.masterPid, ackMsg); err != nil {
log.Errorf("Failed to send StatusUpdate ACK message: %v", err)
return
}
} else { } else {
log.V(2).Infof("Not sending ACK, update is not from slave %q", from.String())
// Send StatusUpdate Acknowledgement; see above for the rules.
// Only send ACK if udpate was not from this driver and spec'd a UUID; this is compat w/ 0.23+
ackRequired := len(msg.Update.Uuid) > 0 && !from.Equal(driver.self) && msg.GetPid() != driver.self.String()
if ackRequired {
ackMsg := &mesos.StatusUpdateAcknowledgementMessage{
SlaveId: msg.Update.SlaveId,
FrameworkId: driver.frameworkInfo.Id,
TaskId: msg.Update.Status.TaskId,
Uuid: msg.Update.Uuid,
}
log.V(2).Infof("Sending ACK for status update %+v to %q", *msg.Update, from.String())
if err := driver.send(ctx, driver.masterPid, ackMsg); err != nil {
log.Errorf("Failed to send StatusUpdate ACK message: %v", err)
}
} else {
log.V(2).Infof("Not sending ACK, update is not from slave %q", from.String())
}
} }
driver.withScheduler(func(s Scheduler) { s.StatusUpdate(driver, status) })
} }
func (driver *MesosSchedulerDriver) exitedExecutor(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) exitedExecutor(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling ExitedExceutor event.") log.V(1).Infoln("Handling ExitedExceutor event.")
msg := pbMsg.(*mesos.ExitedExecutorMessage) msg := pbMsg.(*mesos.ExitedExecutorMessage)
@ -728,7 +768,7 @@ func (driver *MesosSchedulerDriver) exitedExecutor(from *upid.UPID, pbMsg proto.
driver.withScheduler(func(s Scheduler) { s.ExecutorLost(driver, msg.GetExecutorId(), msg.GetSlaveId(), int(status)) }) driver.withScheduler(func(s Scheduler) { s.ExecutorLost(driver, msg.GetExecutorId(), msg.GetSlaveId(), int(status)) })
} }
func (driver *MesosSchedulerDriver) slaveLost(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) slaveLost(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling LostSlave event.") log.V(1).Infoln("Handling LostSlave event.")
msg := pbMsg.(*mesos.LostSlaveMessage) msg := pbMsg.(*mesos.LostSlaveMessage)
@ -751,7 +791,7 @@ func (driver *MesosSchedulerDriver) slaveLost(from *upid.UPID, pbMsg proto.Messa
driver.withScheduler(func(s Scheduler) { s.SlaveLost(driver, msg.SlaveId) }) driver.withScheduler(func(s Scheduler) { s.SlaveLost(driver, msg.SlaveId) })
} }
func (driver *MesosSchedulerDriver) frameworkMessageRcvd(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) frameworkMessageRcvd(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling framework message event.") log.V(1).Infoln("Handling framework message event.")
msg := pbMsg.(*mesos.ExecutorToFrameworkMessage) msg := pbMsg.(*mesos.ExecutorToFrameworkMessage)
@ -766,10 +806,10 @@ func (driver *MesosSchedulerDriver) frameworkMessageRcvd(from *upid.UPID, pbMsg
driver.withScheduler(func(s Scheduler) { s.FrameworkMessage(driver, msg.ExecutorId, msg.SlaveId, string(msg.Data)) }) driver.withScheduler(func(s Scheduler) { s.FrameworkMessage(driver, msg.ExecutorId, msg.SlaveId, string(msg.Data)) })
} }
func (driver *MesosSchedulerDriver) frameworkErrorRcvd(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) frameworkErrorRcvd(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling framework error event.") log.V(1).Infoln("Handling framework error event.")
msg := pbMsg.(*mesos.FrameworkErrorMessage) msg := pbMsg.(*mesos.FrameworkErrorMessage)
driver.error(msg.GetMessage()) driver.error(ctx, msg.GetMessage())
} }
// ---------------------- Interface Methods ---------------------- // // ---------------------- Interface Methods ---------------------- //
@ -828,7 +868,7 @@ func (driver *MesosSchedulerDriver) start() (mesos.Status, error) {
// authenticate against the spec'd master pid using the configured authenticationProvider. // authenticate against the spec'd master pid using the configured authenticationProvider.
// the authentication process is canceled upon either cancelation of authenticating, or // the authentication process is canceled upon either cancelation of authenticating, or
// else because it timed out (authTimeout). // else because it timed out (see defaultAuthenticationTimeout, auth.Timeout).
// //
// TODO(jdef) perhaps at some point in the future this will get pushed down into // TODO(jdef) perhaps at some point in the future this will get pushed down into
// the messenger layer (e.g. to use HTTP-based authentication). We'd probably still // the messenger layer (e.g. to use HTTP-based authentication). We'd probably still
@ -837,17 +877,28 @@ func (driver *MesosSchedulerDriver) start() (mesos.Status, error) {
// //
func (driver *MesosSchedulerDriver) authenticate(pid *upid.UPID, authenticating *authenticationAttempt) error { func (driver *MesosSchedulerDriver) authenticate(pid *upid.UPID, authenticating *authenticationAttempt) error {
log.Infof("authenticating with master %v", pid) log.Infof("authenticating with master %v", pid)
ctx, cancel := context.WithTimeout(context.Background(), authTimeout)
handler := &CredentialHandler{ var (
pid: pid, authTimeout = defaultAuthenticationTimeout
client: driver.self, ctx = driver.withAuthContext(context.TODO())
credential: driver.credential, handler = &CredentialHandler{
pid: pid,
client: driver.self,
credential: driver.credential,
}
)
// check for authentication timeout override
if d, ok := auth.TimeoutFrom(ctx); ok {
authTimeout = d
} }
ctx = driver.withAuthContext(ctx)
ctx, cancel := context.WithTimeout(ctx, authTimeout)
ctx = auth.WithParentUPID(ctx, *driver.self) ctx = auth.WithParentUPID(ctx, *driver.self)
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { ch <- auth.Login(ctx, handler) }() go func() { ch <- auth.Login(ctx, handler) }()
select { select {
case <-ctx.Done(): case <-ctx.Done():
<-ch <-ch
@ -855,7 +906,7 @@ func (driver *MesosSchedulerDriver) authenticate(pid *upid.UPID, authenticating
case <-authenticating.done: case <-authenticating.done:
cancel() cancel()
<-ch <-ch
return authenticationCanceledError return errAuthenticationCanceled
case e := <-ch: case e := <-ch:
cancel() cancel()
return e return e
@ -901,6 +952,7 @@ func (driver *MesosSchedulerDriver) registerOnce() bool {
failover bool failover bool
pid *upid.UPID pid *upid.UPID
info *mesos.FrameworkInfo info *mesos.FrameworkInfo
ctx context.Context
) )
if func() bool { if func() bool {
driver.eventLock.RLock() driver.eventLock.RLock()
@ -914,6 +966,7 @@ func (driver *MesosSchedulerDriver) registerOnce() bool {
failover = driver.failover failover = driver.failover
pid = driver.masterPid pid = driver.masterPid
info = proto.Clone(driver.frameworkInfo).(*mesos.FrameworkInfo) info = proto.Clone(driver.frameworkInfo).(*mesos.FrameworkInfo)
ctx = driver.context()
return true return true
}() { }() {
// register framework // register framework
@ -931,7 +984,7 @@ func (driver *MesosSchedulerDriver) registerOnce() bool {
Framework: info, Framework: info,
} }
} }
if err := driver.send(pid, message); err != nil { if err := driver.send(ctx, pid, message); err != nil {
log.Errorf("failed to send RegisterFramework message: %v", err) log.Errorf("failed to send RegisterFramework message: %v", err)
if _, err = driver.Stop(failover); err != nil { if _, err = driver.Stop(failover); err != nil {
log.Errorf("failed to stop scheduler driver: %v", err) log.Errorf("failed to stop scheduler driver: %v", err)
@ -982,15 +1035,15 @@ waitForDeath:
func (driver *MesosSchedulerDriver) Run() (mesos.Status, error) { func (driver *MesosSchedulerDriver) Run() (mesos.Status, error) {
driver.eventLock.Lock() driver.eventLock.Lock()
defer driver.eventLock.Unlock() defer driver.eventLock.Unlock()
return driver.run() return driver.run(driver.context())
} }
// run expected to be guarded by eventLock // run expected to be guarded by eventLock
func (driver *MesosSchedulerDriver) run() (mesos.Status, error) { func (driver *MesosSchedulerDriver) run(ctx context.Context) (mesos.Status, error) {
stat, err := driver.start() stat, err := driver.start()
if err != nil { if err != nil {
return driver.stop(err, false) return driver.stop(ctx, err, false)
} }
if stat != mesos.Status_DRIVER_RUNNING { if stat != mesos.Status_DRIVER_RUNNING {
@ -1005,11 +1058,11 @@ func (driver *MesosSchedulerDriver) run() (mesos.Status, error) {
func (driver *MesosSchedulerDriver) Stop(failover bool) (mesos.Status, error) { func (driver *MesosSchedulerDriver) Stop(failover bool) (mesos.Status, error) {
driver.eventLock.Lock() driver.eventLock.Lock()
defer driver.eventLock.Unlock() defer driver.eventLock.Unlock()
return driver.stop(nil, failover) return driver.stop(driver.context(), nil, failover)
} }
// stop expects to be guarded by eventLock // stop expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) stop(cause error, failover bool) (mesos.Status, error) { func (driver *MesosSchedulerDriver) stop(ctx context.Context, cause error, failover bool) (mesos.Status, error) {
log.Infoln("Stopping the scheduler driver") log.Infoln("Stopping the scheduler driver")
if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING { if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat) return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat)
@ -1024,7 +1077,7 @@ func (driver *MesosSchedulerDriver) stop(cause error, failover bool) (mesos.Stat
//TODO(jdef) this is actually a little racy: we send an 'unregister' message but then //TODO(jdef) this is actually a little racy: we send an 'unregister' message but then
// immediately afterward the messenger is stopped in driver._stop(). so the unregister message // immediately afterward the messenger is stopped in driver._stop(). so the unregister message
// may not actually end up being sent out. // may not actually end up being sent out.
if err := driver.send(driver.masterPid, message); err != nil { if err := driver.send(ctx, driver.masterPid, message); err != nil {
log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err) log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err)
if cause == nil { if cause == nil {
cause = &ErrDriverAborted{} cause = &ErrDriverAborted{}
@ -1048,17 +1101,23 @@ func (driver *MesosSchedulerDriver) _stop(cause error, stopStatus mesos.Status)
default: default:
} }
close(driver.stopCh) close(driver.stopCh)
if cause != nil { // decouple to avoid deadlock (avoid nested withScheduler() invocations)
log.V(1).Infof("Sending error via withScheduler: %v", cause) go func() {
driver.withScheduler(func(s Scheduler) { s.Error(driver, cause.Error()) }) driver.eventLock.Lock()
} else { defer driver.eventLock.Unlock()
// send a noop func, withScheduler needs to see that stopCh is closed if cause != nil {
log.V(1).Infof("Sending kill signal to withScheduler") log.V(1).Infof("Sending error via withScheduler: %v", cause)
driver.withScheduler(func(_ Scheduler) {}) driver.withScheduler(func(s Scheduler) { s.Error(driver, cause.Error()) })
} } else {
// send a noop func, withScheduler needs to see that stopCh is closed
log.V(1).Infof("Sending kill signal to withScheduler")
driver.withScheduler(func(_ Scheduler) {})
}
}()
}() }()
driver.status = stopStatus driver.status = stopStatus
driver.connected = false driver.connected = false
driver.connection = uuid.UUID{}
log.Info("stopping messenger") log.Info("stopping messenger")
err := driver.messenger.Stop() err := driver.messenger.Stop()
@ -1070,11 +1129,11 @@ func (driver *MesosSchedulerDriver) _stop(cause error, stopStatus mesos.Status)
func (driver *MesosSchedulerDriver) Abort() (stat mesos.Status, err error) { func (driver *MesosSchedulerDriver) Abort() (stat mesos.Status, err error) {
driver.eventLock.Lock() driver.eventLock.Lock()
defer driver.eventLock.Unlock() defer driver.eventLock.Unlock()
return driver.abort(nil) return driver.abort(driver.context(), nil)
} }
// abort expects to be guarded by eventLock // abort expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) abort(cause error) (stat mesos.Status, err error) { func (driver *MesosSchedulerDriver) abort(ctx context.Context, cause error) (stat mesos.Status, err error) {
if driver.masterDetector != nil { if driver.masterDetector != nil {
defer driver.masterDetector.Cancel() defer driver.masterDetector.Cancel()
} }
@ -1082,7 +1141,7 @@ func (driver *MesosSchedulerDriver) abort(cause error) (stat mesos.Status, err e
log.Infof("Aborting framework [%+v]", driver.frameworkInfo.Id) log.Infof("Aborting framework [%+v]", driver.frameworkInfo.Id)
if driver.connected { if driver.connected {
_, err = driver.stop(cause, true) _, err = driver.stop(ctx, cause, true)
} else { } else {
driver._stop(cause, mesos.Status_DRIVER_ABORTED) driver._stop(cause, mesos.Status_DRIVER_ABORTED)
} }
@ -1100,13 +1159,21 @@ func (driver *MesosSchedulerDriver) AcceptOffers(offerIds []*mesos.OfferID, oper
return stat, fmt.Errorf("Unable to AcceptOffers, expected driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat) return stat, fmt.Errorf("Unable to AcceptOffers, expected driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat)
} }
ctx := driver.context()
if !driver.connected { if !driver.connected {
err := fmt.Errorf("Not connected to master.") err := ErrDisconnected
for _, operation := range operations { for _, operation := range operations {
if *operation.Type == mesos.Offer_Operation_LAUNCH { if *operation.Type == mesos.Offer_Operation_LAUNCH {
for _, task := range operation.Launch.TaskInfos { // decouple lost task processing to avoid deadlock (avoid nested withScheduler() invocations)
driver.pushLostTask(task, "Unable to launch tasks: "+err.Error()) operation := operation
} go func() {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
for _, task := range operation.Launch.TaskInfos {
driver.pushLostTask(ctx, task, err.Error())
}
}()
} }
} }
log.Errorf("Failed to send LaunchTask message: %v\n", err) log.Errorf("Failed to send LaunchTask message: %v\n", err)
@ -1188,12 +1255,19 @@ func (driver *MesosSchedulerDriver) AcceptOffers(offerIds []*mesos.OfferID, oper
}, },
} }
if err := driver.send(driver.masterPid, message); err != nil { if err := driver.send(ctx, driver.masterPid, message); err != nil {
for _, operation := range operations { for _, operation := range operations {
if *operation.Type == mesos.Offer_Operation_LAUNCH { if *operation.Type == mesos.Offer_Operation_LAUNCH {
for _, task := range operation.Launch.TaskInfos { // decouple lost task processing to avoid deadlock (avoid nested withScheduler() invocations)
driver.pushLostTask(task, "Unable to launch tasks: "+err.Error()) operation := operation
} go func() {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
for _, task := range operation.Launch.TaskInfos {
driver.pushLostTask(ctx, task, "Unable to launch tasks: "+err.Error())
}
}()
} }
} }
log.Errorf("Failed to send LaunchTask message: %v\n", err) log.Errorf("Failed to send LaunchTask message: %v\n", err)
@ -1211,15 +1285,24 @@ func (driver *MesosSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, tasks
return stat, fmt.Errorf("Unable to LaunchTasks, expected driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat) return stat, fmt.Errorf("Unable to LaunchTasks, expected driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat)
} }
ctx := driver.context()
// Launch tasks // Launch tasks
if !driver.connected { if !driver.connected {
log.Infoln("Ignoring LaunchTasks message, disconnected from master.") log.Infoln("Ignoring LaunchTasks message, disconnected from master.")
// Send statusUpdate with status=TASK_LOST for each task. // decouple lost task processing to avoid deadlock (avoid nested withScheduler() invocations)
// See sched.cpp L#823 err := ErrDisconnected
for _, task := range tasks { go func() {
driver.pushLostTask(task, "Master is disconnected") driver.eventLock.Lock()
} defer driver.eventLock.Unlock()
return driver.status, fmt.Errorf("Not connected to master. Tasks marked as lost.")
// Send statusUpdate with status=TASK_LOST for each task.
// See sched.cpp L#823
for _, task := range tasks {
driver.pushLostTask(ctx, task, err.Error())
}
}()
return driver.status, err
} }
okTasks := make([]*mesos.TaskInfo, 0, len(tasks)) okTasks := make([]*mesos.TaskInfo, 0, len(tasks))
@ -1260,10 +1343,16 @@ func (driver *MesosSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, tasks
Filters: filters, Filters: filters,
} }
if err := driver.send(driver.masterPid, message); err != nil { if err := driver.send(ctx, driver.masterPid, message); err != nil {
for _, task := range tasks { // decouple lost task processing to avoid deadlock (avoid nested withScheduler() invocations)
driver.pushLostTask(task, "Unable to launch tasks: "+err.Error()) go func() {
} driver.eventLock.Lock()
defer driver.eventLock.Unlock()
for _, task := range tasks {
driver.pushLostTask(ctx, task, "Unable to launch tasks: "+err.Error())
}
}()
log.Errorf("Failed to send LaunchTask message: %v\n", err) log.Errorf("Failed to send LaunchTask message: %v\n", err)
return driver.status, err return driver.status, err
} }
@ -1272,7 +1361,7 @@ func (driver *MesosSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, tasks
} }
// pushLostTask expects to be guarded by eventLock // pushLostTask expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) pushLostTask(taskInfo *mesos.TaskInfo, why string) { func (driver *MesosSchedulerDriver) pushLostTask(ctx context.Context, taskInfo *mesos.TaskInfo, why string) {
msg := &mesos.StatusUpdateMessage{ msg := &mesos.StatusUpdateMessage{
Update: &mesos.StatusUpdate{ Update: &mesos.StatusUpdate{
FrameworkId: driver.frameworkInfo.Id, FrameworkId: driver.frameworkInfo.Id,
@ -1292,7 +1381,7 @@ func (driver *MesosSchedulerDriver) pushLostTask(taskInfo *mesos.TaskInfo, why s
// put it on internal chanel // put it on internal chanel
// will cause handler to push to attached Scheduler // will cause handler to push to attached Scheduler
driver.statusUpdated(driver.self, msg) driver.statusUpdated(ctx, driver.self, msg)
} }
func (driver *MesosSchedulerDriver) KillTask(taskId *mesos.TaskID) (mesos.Status, error) { func (driver *MesosSchedulerDriver) KillTask(taskId *mesos.TaskID) (mesos.Status, error) {
@ -1305,7 +1394,7 @@ func (driver *MesosSchedulerDriver) KillTask(taskId *mesos.TaskID) (mesos.Status
if !driver.connected { if !driver.connected {
log.Infoln("Ignoring kill task message, disconnected from master.") log.Infoln("Ignoring kill task message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master") return driver.status, ErrDisconnected
} }
message := &mesos.KillTaskMessage{ message := &mesos.KillTaskMessage{
@ -1313,7 +1402,7 @@ func (driver *MesosSchedulerDriver) KillTask(taskId *mesos.TaskID) (mesos.Status
TaskId: taskId, TaskId: taskId,
} }
if err := driver.send(driver.masterPid, message); err != nil { if err := driver.send(driver.context(), driver.masterPid, message); err != nil {
log.Errorf("Failed to send KillTask message: %v\n", err) log.Errorf("Failed to send KillTask message: %v\n", err)
return driver.status, err return driver.status, err
} }
@ -1331,7 +1420,7 @@ func (driver *MesosSchedulerDriver) RequestResources(requests []*mesos.Request)
if !driver.connected { if !driver.connected {
log.Infoln("Ignoring request resource message, disconnected from master.") log.Infoln("Ignoring request resource message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master") return driver.status, ErrDisconnected
} }
message := &mesos.ResourceRequestMessage{ message := &mesos.ResourceRequestMessage{
@ -1339,7 +1428,7 @@ func (driver *MesosSchedulerDriver) RequestResources(requests []*mesos.Request)
Requests: requests, Requests: requests,
} }
if err := driver.send(driver.masterPid, message); err != nil { if err := driver.send(driver.context(), driver.masterPid, message); err != nil {
log.Errorf("Failed to send ResourceRequest message: %v\n", err) log.Errorf("Failed to send ResourceRequest message: %v\n", err)
return driver.status, err return driver.status, err
} }
@ -1362,13 +1451,13 @@ func (driver *MesosSchedulerDriver) ReviveOffers() (mesos.Status, error) {
} }
if !driver.connected { if !driver.connected {
log.Infoln("Ignoring revive offers message, disconnected from master.") log.Infoln("Ignoring revive offers message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master.") return driver.status, ErrDisconnected
} }
message := &mesos.ReviveOffersMessage{ message := &mesos.ReviveOffersMessage{
FrameworkId: driver.frameworkInfo.Id, FrameworkId: driver.frameworkInfo.Id,
} }
if err := driver.send(driver.masterPid, message); err != nil { if err := driver.send(driver.context(), driver.masterPid, message); err != nil {
log.Errorf("Failed to send ReviveOffers message: %v\n", err) log.Errorf("Failed to send ReviveOffers message: %v\n", err)
return driver.status, err return driver.status, err
} }
@ -1385,7 +1474,7 @@ func (driver *MesosSchedulerDriver) SendFrameworkMessage(executorId *mesos.Execu
} }
if !driver.connected { if !driver.connected {
log.Infoln("Ignoring send framework message, disconnected from master.") log.Infoln("Ignoring send framework message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master") return driver.status, ErrDisconnected
} }
message := &mesos.FrameworkToExecutorMessage{ message := &mesos.FrameworkToExecutorMessage{
@ -1401,13 +1490,13 @@ func (driver *MesosSchedulerDriver) SendFrameworkMessage(executorId *mesos.Execu
if slavePid.Equal(driver.self) { if slavePid.Equal(driver.self) {
return driver.status, nil return driver.status, nil
} }
if err := driver.send(slavePid, message); err != nil { if err := driver.send(driver.context(), slavePid, message); err != nil {
log.Errorf("Failed to send framework to executor message: %v\n", err) log.Errorf("Failed to send framework to executor message: %v\n", err)
return driver.status, err return driver.status, err
} }
} else { } else {
// slavePid not cached, send to master. // slavePid not cached, send to master.
if err := driver.send(driver.masterPid, message); err != nil { if err := driver.send(driver.context(), driver.masterPid, message); err != nil {
log.Errorf("Failed to send framework to executor message: %v\n", err) log.Errorf("Failed to send framework to executor message: %v\n", err)
return driver.status, err return driver.status, err
} }
@ -1425,14 +1514,14 @@ func (driver *MesosSchedulerDriver) ReconcileTasks(statuses []*mesos.TaskStatus)
} }
if !driver.connected { if !driver.connected {
log.Infoln("Ignoring send Reconcile Tasks message, disconnected from master.") log.Infoln("Ignoring send Reconcile Tasks message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master.") return driver.status, ErrDisconnected
} }
message := &mesos.ReconcileTasksMessage{ message := &mesos.ReconcileTasksMessage{
FrameworkId: driver.frameworkInfo.Id, FrameworkId: driver.frameworkInfo.Id,
Statuses: statuses, Statuses: statuses,
} }
if err := driver.send(driver.masterPid, message); err != nil { if err := driver.send(driver.context(), driver.masterPid, message); err != nil {
log.Errorf("Failed to send reconcile tasks message: %v\n", err) log.Errorf("Failed to send reconcile tasks message: %v\n", err)
return driver.status, err return driver.status, err
} }
@ -1441,10 +1530,10 @@ func (driver *MesosSchedulerDriver) ReconcileTasks(statuses []*mesos.TaskStatus)
} }
// error expects to be guarded by eventLock // error expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) error(err string) { func (driver *MesosSchedulerDriver) error(ctx context.Context, err string) {
if driver.status == mesos.Status_DRIVER_ABORTED { if driver.status == mesos.Status_DRIVER_ABORTED {
log.V(3).Infoln("Ignoring error message, the driver is aborted!") log.V(3).Infoln("Ignoring error message, the driver is aborted!")
return return
} }
driver.abort(&ErrDriverAborted{Reason: err}) driver.abort(ctx, &ErrDriverAborted{Reason: err})
} }