diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 7581a5e0d8f..23ffa69bcc4 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -757,43 +757,43 @@ }, { "ImportPath": "github.com/mesos/mesos-go/auth", - "Comment": "before-0.26-protos-14-g4a7554a", - "Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" + "Comment": "before-0.26-protos-29-gb755e34", + "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" }, { "ImportPath": "github.com/mesos/mesos-go/detector", - "Comment": "before-0.26-protos-14-g4a7554a", - "Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" + "Comment": "before-0.26-protos-29-gb755e34", + "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" }, { "ImportPath": "github.com/mesos/mesos-go/executor", - "Comment": "before-0.26-protos-14-g4a7554a", - "Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" + "Comment": "before-0.26-protos-29-gb755e34", + "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" }, { "ImportPath": "github.com/mesos/mesos-go/mesosproto", - "Comment": "before-0.26-protos-14-g4a7554a", - "Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" + "Comment": "before-0.26-protos-29-gb755e34", + "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" }, { "ImportPath": "github.com/mesos/mesos-go/mesosutil", - "Comment": "before-0.26-protos-14-g4a7554a", - "Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" + "Comment": "before-0.26-protos-29-gb755e34", + "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" }, { "ImportPath": "github.com/mesos/mesos-go/messenger", - "Comment": "before-0.26-protos-14-g4a7554a", - "Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" + "Comment": "before-0.26-protos-29-gb755e34", + "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" }, { "ImportPath": "github.com/mesos/mesos-go/scheduler", - "Comment": "before-0.26-protos-14-g4a7554a", - "Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" + "Comment": "before-0.26-protos-29-gb755e34", + "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" }, { "ImportPath": "github.com/mesos/mesos-go/upid", - "Comment": "before-0.26-protos-14-g4a7554a", - "Rev": "4a7554aad396c70d19c9fc3469980547c9f117ae" + "Comment": "before-0.26-protos-29-gb755e34", + "Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" }, { "ImportPath": "github.com/miekg/dns", diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/login.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/login.go index 416c2d61274..58dc9d3e993 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/login.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/login.go @@ -3,6 +3,7 @@ package auth import ( "errors" "fmt" + "time" "github.com/mesos/mesos-go/auth/callback" "github.com/mesos/mesos-go/upid" @@ -38,8 +39,14 @@ func Login(ctx context.Context, handler callback.Handler) error { type loginKeyType int const ( - loginProviderNameKey loginKeyType = iota // name of login provider to use - parentUpidKey // upid.UPID of some parent process + // name of login provider to use + loginProviderNameKey loginKeyType = iota + + // upid.UPID of some parent process + parentUpidKey + + // time.Duration that limits the overall duration of an auth attempt + timeoutKey ) // Return a context that inherits all values from the parent ctx and specifies @@ -74,7 +81,20 @@ func ParentUPIDFrom(ctx context.Context) (pid upid.UPID, ok bool) { func ParentUPID(ctx context.Context) (upid *upid.UPID) { if upid, ok := ParentUPIDFrom(ctx); ok { return &upid - } else { - return nil } + return nil +} + +func TimeoutFrom(ctx context.Context) (d time.Duration, ok bool) { + d, ok = ctx.Value(timeoutKey).(time.Duration) + return +} + +func Timeout(ctx context.Context) (d time.Duration) { + d, _ = TimeoutFrom(ctx) + return +} + +func WithTimeout(ctx context.Context, d time.Duration) context.Context { + return context.WithValue(ctx, timeoutKey, d) } diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor.go index 4506522ac24..06c0c9317eb 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor.go @@ -31,11 +31,16 @@ import ( "github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil/process" "github.com/mesos/mesos-go/messenger" + "github.com/mesos/mesos-go/messenger/sessionid" "github.com/mesos/mesos-go/upid" "github.com/pborman/uuid" "golang.org/x/net/context" ) +const ( + defaultRecoveryTimeout = 15 * time.Minute +) + type DriverConfig struct { Executor Executor HostnameOverride string // optional @@ -64,6 +69,7 @@ type MesosExecutorDriver struct { directory string // TODO(yifan): Not used yet. checkpoint bool recoveryTimeout time.Duration + recoveryTimer *time.Timer updates map[string]*mesosproto.StatusUpdate // Key is a UUID string. TODO(yifan): Not used yet. tasks map[string]*mesosproto.TaskInfo // Key is a UUID string. TODO(yifan): Not used yet. withExecutor func(f func(e Executor)) @@ -88,12 +94,13 @@ func NewMesosExecutorDriver(config DriverConfig) (*MesosExecutorDriver, error) { } driver := &MesosExecutorDriver{ - status: mesosproto.Status_DRIVER_NOT_STARTED, - stopCh: make(chan struct{}), - updates: make(map[string]*mesosproto.StatusUpdate), - tasks: make(map[string]*mesosproto.TaskInfo), - workDir: ".", - started: make(chan struct{}), + status: mesosproto.Status_DRIVER_NOT_STARTED, + stopCh: make(chan struct{}), + updates: make(map[string]*mesosproto.StatusUpdate), + tasks: make(map[string]*mesosproto.TaskInfo), + workDir: ".", + started: make(chan struct{}), + recoveryTimeout: defaultRecoveryTimeout, } driver.cond = sync.NewCond(&driver.lock) // decouple serialized executor callback execution from goroutines of this driver @@ -116,6 +123,11 @@ func NewMesosExecutorDriver(config DriverConfig) (*MesosExecutorDriver, error) { return driver, nil } +// context returns the driver context, expects driver.lock to be locked +func (driver *MesosExecutorDriver) context() context.Context { + return sessionid.NewContext(context.TODO(), driver.connection.String()) +} + // init initializes the driver. func (driver *MesosExecutorDriver) init() error { log.Infof("Init mesos executor driver\n") @@ -127,11 +139,13 @@ func (driver *MesosExecutorDriver) init() error { return err } - guard := func(h messenger.MessageHandler) messenger.MessageHandler { + type messageHandler func(context.Context, *upid.UPID, proto.Message) + + guard := func(h messageHandler) messenger.MessageHandler { return messenger.MessageHandler(func(from *upid.UPID, pbMsg proto.Message) { driver.lock.Lock() defer driver.lock.Unlock() - h(from, pbMsg) + h(driver.context(), from, pbMsg) }) } @@ -145,6 +159,7 @@ func (driver *MesosExecutorDriver) init() error { driver.messenger.Install(guard(driver.frameworkMessage), &mesosproto.FrameworkToExecutorMessage{}) driver.messenger.Install(guard(driver.shutdown), &mesosproto.ShutdownExecutorMessage{}) driver.messenger.Install(guard(driver.frameworkError), &mesosproto.FrameworkErrorMessage{}) + driver.messenger.Install(guard(driver.networkError), &mesosproto.InternalNetworkError{}) return nil } @@ -185,7 +200,13 @@ func (driver *MesosExecutorDriver) parseEnviroments() error { if value == "1" { driver.checkpoint = true } - // TODO(yifan): Parse the duration. For now just use default. + + /* + if driver.checkpoint { + value = os.Getenv("MESOS_RECOVERY_TIMEOUT") + } + // TODO(yifan): Parse the duration. For now just use default. + */ return nil } @@ -214,7 +235,73 @@ func (driver *MesosExecutorDriver) Connected() bool { // --------------------- Message Handlers --------------------- // -func (driver *MesosExecutorDriver) registered(from *upid.UPID, pbMsg proto.Message) { +// networkError is invoked when there's a network-level error communicating with the mesos slave. +// The driver reacts by entering a "disconnected" state and invoking the Executor.Disconnected +// callback. The assumption is that if this driver was previously connected, and then there's a +// network error, then the slave process must be dying/dead. The native driver implementation makes +// this same assumption. I have some concerns that this may be a false-positive in some situations; +// some network errors (timeouts) may be indicative of something other than a dead slave process. +func (driver *MesosExecutorDriver) networkError(ctx context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Info("ignoring network error because aborted") + return + } + if driver.connected { + driver.connected = false + msg := pbMsg.(*mesosproto.InternalNetworkError) + session := msg.GetSession() + + if session != driver.connection.String() { + log.V(1).Infoln("ignoring netwok error for disconnected/stale session") + return + } + + if driver.checkpoint { + log.Infoln("slave disconnected, will wait for recovery") + driver.withExecutor(func(e Executor) { e.Disconnected(driver) }) + + if driver.recoveryTimer != nil { + driver.recoveryTimer.Stop() + } + t := time.NewTimer(driver.recoveryTimeout) + driver.recoveryTimer = t + go func() { + select { + case <-t.C: + // timer expired + driver.lock.Lock() + defer driver.lock.Unlock() + driver.recoveryTimedOut(session) + + case <-driver.stopCh: + // driver stopped + return + } + }() + return + } + } + log.Infoln("slave exited ... shutting down") + driver.withExecutor(func(e Executor) { e.Shutdown(driver) }) // abnormal shutdown + driver.abort() +} + +func (driver *MesosExecutorDriver) recoveryTimedOut(connection string) { + if driver.connected { + return + } + // ensure that connection ID's match otherwise we've been re-registered + if connection == driver.connection.String() { + log.Info("recovery timeout of %v exceeded; shutting down", driver.recoveryTimeout) + driver.shutdown(driver.context(), nil, nil) + } +} + +func (driver *MesosExecutorDriver) registered(_ context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring registration message from slave because aborted") + return + } log.Infoln("Executor driver registered") msg := pbMsg.(*mesosproto.ExecutorRegisteredMessage) @@ -235,7 +322,11 @@ func (driver *MesosExecutorDriver) registered(from *upid.UPID, pbMsg proto.Messa driver.withExecutor(func(e Executor) { e.Registered(driver, executorInfo, frameworkInfo, slaveInfo) }) } -func (driver *MesosExecutorDriver) reregistered(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosExecutorDriver) reregistered(_ context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring reregistration message from slave because aborted") + return + } log.Infoln("Executor driver reregistered") msg := pbMsg.(*mesosproto.ExecutorReregisteredMessage) @@ -254,11 +345,7 @@ func (driver *MesosExecutorDriver) reregistered(from *upid.UPID, pbMsg proto.Mes driver.withExecutor(func(e Executor) { e.Reregistered(driver, slaveInfo) }) } -func (driver *MesosExecutorDriver) send(upid *upid.UPID, msg proto.Message) error { - //TODO(jdef) should implement timeout here - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - +func (driver *MesosExecutorDriver) send(ctx context.Context, upid *upid.UPID, msg proto.Message) error { c := make(chan error, 1) go func() { c <- driver.messenger.Send(ctx, upid, msg) }() @@ -271,7 +358,11 @@ func (driver *MesosExecutorDriver) send(upid *upid.UPID, msg proto.Message) erro } } -func (driver *MesosExecutorDriver) reconnect(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosExecutorDriver) reconnect(ctx context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring reconnect message from slave because aborted") + return + } log.Infoln("Executor driver reconnect") msg := pbMsg.(*mesosproto.ReconnectExecutorMessage) @@ -298,12 +389,16 @@ func (driver *MesosExecutorDriver) reconnect(from *upid.UPID, pbMsg proto.Messag message.Tasks = append(message.Tasks, t) } // Send the message. - if err := driver.send(driver.slaveUPID, message); err != nil { + if err := driver.send(ctx, driver.slaveUPID, message); err != nil { log.Errorf("Failed to send %v: %v\n", message, err) } } -func (driver *MesosExecutorDriver) runTask(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosExecutorDriver) runTask(_ context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring runTask message from slave because aborted") + return + } log.Infoln("Executor driver runTask") msg := pbMsg.(*mesosproto.RunTaskMessage) @@ -323,7 +418,11 @@ func (driver *MesosExecutorDriver) runTask(from *upid.UPID, pbMsg proto.Message) driver.withExecutor(func(e Executor) { e.LaunchTask(driver, task) }) } -func (driver *MesosExecutorDriver) killTask(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosExecutorDriver) killTask(_ context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring killTask message from slave because aborted") + return + } log.Infoln("Executor driver killTask") msg := pbMsg.(*mesosproto.KillTaskMessage) @@ -338,7 +437,11 @@ func (driver *MesosExecutorDriver) killTask(from *upid.UPID, pbMsg proto.Message driver.withExecutor(func(e Executor) { e.KillTask(driver, taskID) }) } -func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(_ context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring status update ack message because aborted") + return + } log.Infoln("Executor statusUpdateAcknowledgement") msg := pbMsg.(*mesosproto.StatusUpdateAcknowledgementMessage) @@ -359,7 +462,11 @@ func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(from *upid.UPID, delete(driver.tasks, taskID.String()) } -func (driver *MesosExecutorDriver) frameworkMessage(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosExecutorDriver) frameworkMessage(_ context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring frameworkMessage message from slave because aborted") + return + } log.Infoln("Executor driver received frameworkMessage") msg := pbMsg.(*mesosproto.FrameworkToExecutorMessage) @@ -374,13 +481,12 @@ func (driver *MesosExecutorDriver) frameworkMessage(from *upid.UPID, pbMsg proto driver.withExecutor(func(e Executor) { e.FrameworkMessage(driver, string(data)) }) } -func (driver *MesosExecutorDriver) shutdown(from *upid.UPID, pbMsg proto.Message) { - log.Infoln("Executor driver received shutdown") - - _, ok := pbMsg.(*mesosproto.ShutdownExecutorMessage) - if !ok { - panic("Not a ShutdownExecutorMessage! This should not happen") +func (driver *MesosExecutorDriver) shutdown(_ context.Context, _ *upid.UPID, _ proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring shutdown message because aborted") + return } + log.Infoln("Executor driver received shutdown") if driver.stopped() { log.Infof("Ignoring shutdown message because the driver is stopped!\n") @@ -394,7 +500,11 @@ func (driver *MesosExecutorDriver) shutdown(from *upid.UPID, pbMsg proto.Message driver.stop() } -func (driver *MesosExecutorDriver) frameworkError(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosExecutorDriver) frameworkError(_ context.Context, from *upid.UPID, pbMsg proto.Message) { + if driver.status == mesosproto.Status_DRIVER_ABORTED { + log.V(1).Infof("ignoring framework error message because aborted") + return + } log.Infoln("Executor driver received error") msg := pbMsg.(*mesosproto.FrameworkErrorMessage) @@ -433,7 +543,7 @@ func (driver *MesosExecutorDriver) start() (mesosproto.Status, error) { ExecutorId: driver.executorID, } - if err := driver.send(driver.slaveUPID, message); err != nil { + if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil { log.Errorf("Stopping the executor, failed to send %v: %v\n", message, err) err0 := driver._stop(driver.status) if err0 != nil { @@ -585,7 +695,7 @@ func (driver *MesosExecutorDriver) sendStatusUpdate(taskStatus *mesosproto.TaskS Pid: proto.String(driver.self.String()), } // Send the message. - if err := driver.send(driver.slaveUPID, message); err != nil { + if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil { log.Errorf("Failed to send %v: %v\n", message, err) return driver.status, err } @@ -632,7 +742,7 @@ func (driver *MesosExecutorDriver) sendFrameworkMessage(data string) (mesosproto } // Send the message. - if err := driver.send(driver.slaveUPID, message); err != nil { + if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil { log.Errorln("Failed to send message %v: %v", message, err) return driver.status, err } diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/authentication.pb.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/authentication.pb.go index 3950ce3a4e9..1d1b6b37c6a 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/authentication.pb.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/authentication.pb.go @@ -22,6 +22,7 @@ It has these top-level messages: InternalMasterChangeDetected InternalTryAuthentication InternalAuthenticationResult + InternalNetworkError FrameworkID OfferID SlaveID diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/internal.pb.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/internal.pb.go index 8988e89978b..c2eed23cc35 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/internal.pb.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/internal.pb.go @@ -75,3 +75,29 @@ func (m *InternalAuthenticationResult) GetPid() string { } return "" } + +type InternalNetworkError struct { + // master pid that this event pertains to + Pid *string `protobuf:"bytes,1,req,name=pid" json:"pid,omitempty"` + // driver session UUID + Session *string `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *InternalNetworkError) Reset() { *m = InternalNetworkError{} } +func (m *InternalNetworkError) String() string { return proto.CompactTextString(m) } +func (*InternalNetworkError) ProtoMessage() {} + +func (m *InternalNetworkError) GetPid() string { + if m != nil && m.Pid != nil { + return *m.Pid + } + return "" +} + +func (m *InternalNetworkError) GetSession() string { + if m != nil && m.Session != nil { + return *m.Session + } + return "" +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/internal.proto b/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/internal.proto index ae0cdaec3e6..e988585999b 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/internal.proto +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/mesosproto/internal.proto @@ -21,3 +21,10 @@ message InternalAuthenticationResult { // master pid that this result pertains to required string pid = 3; } + +message InternalNetworkError { + // master pid that this event pertains to + required string pid = 1; + // driver session UUID + optional string session = 2; +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/decoder.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/decoder.go index df1661bb2cc..a1f8bac75ae 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/decoder.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/decoder.go @@ -20,9 +20,6 @@ import ( ) const ( - DefaultReadTimeout = 5 * time.Second - DefaultWriteTimeout = 5 * time.Second - // writeFlushPeriod is the amount of time we're willing to wait for a single // response buffer to be fully written to the underlying TCP connection; after // this amount of time the remaining bytes of the response are discarded. see @@ -43,13 +40,8 @@ func (did *decoderID) next() decoderID { var ( errHijackFailed = errors.New("failed to hijack http connection") did decoderID // decoder ID counter - closedChan = make(chan struct{}) ) -func init() { - close(closedChan) -} - type Decoder interface { Requests() <-chan *Request Err() <-chan error @@ -99,8 +91,8 @@ func DecodeHTTP(w http.ResponseWriter, r *http.Request) Decoder { req: r, shouldQuit: make(chan struct{}), forceQuit: make(chan struct{}), - readTimeout: DefaultReadTimeout, - writeTimeout: DefaultWriteTimeout, + readTimeout: ReadTimeout, + writeTimeout: WriteTimeout, idtag: id.String(), outCh: make(chan *bytes.Buffer), } diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go index 98a2b0af663..0214d96edde 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go @@ -20,6 +20,7 @@ package messenger import ( "bytes" + "crypto/tls" "encoding/hex" "errors" "fmt" @@ -27,11 +28,9 @@ import ( "io/ioutil" "net" "net/http" - "net/url" "os" "strings" "sync" - "syscall" "time" log "github.com/golang/glog" @@ -39,30 +38,39 @@ import ( "golang.org/x/net/context" ) +const ( + DefaultReadTimeout = 10 * time.Second + DefaultWriteTimeout = 10 * time.Second +) + var ( + ReadTimeout = DefaultReadTimeout + WriteTimeout = DefaultWriteTimeout + discardOnStopError = fmt.Errorf("discarding message because transport is shutting down") errNotStarted = errors.New("HTTP transport has not been started") errTerminal = errors.New("HTTP transport is terminated") errAlreadyRunning = errors.New("HTTP transport is already running") - httpTransport, httpClient = &http.Transport{ + httpTransport = http.Transport{ Dial: (&net.Dialer{ Timeout: 10 * time.Second, KeepAlive: 30 * time.Second, }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - }, - &http.Client{ - Transport: httpTransport, - Timeout: DefaultReadTimeout, - } + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: DefaultReadTimeout, + } + + // HttpClient is used for sending messages to remote processes + HttpClient = http.Client{ + Timeout: DefaultReadTimeout, + } ) // httpTransporter is a subset of the Transporter interface type httpTransporter interface { Send(ctx context.Context, msg *Message) error Recv() (*Message, error) - Inject(ctx context.Context, msg *Message) error Install(messageName string) Start() (upid.UPID, <-chan error) Stop(graceful bool) error @@ -80,11 +88,10 @@ type runningState struct { /* -- not-started state */ -func (s *notStartedState) Send(ctx context.Context, msg *Message) error { return errNotStarted } -func (s *notStartedState) Recv() (*Message, error) { return nil, errNotStarted } -func (s *notStartedState) Inject(ctx context.Context, msg *Message) error { return errNotStarted } -func (s *notStartedState) Stop(graceful bool) error { return errNotStarted } -func (s *notStartedState) Install(messageName string) { s.h.install(messageName) } +func (s *notStartedState) Send(ctx context.Context, msg *Message) error { return errNotStarted } +func (s *notStartedState) Recv() (*Message, error) { return nil, errNotStarted } +func (s *notStartedState) Stop(graceful bool) error { return errNotStarted } +func (s *notStartedState) Install(messageName string) { s.h.install(messageName) } func (s *notStartedState) Start() (upid.UPID, <-chan error) { s.h.state = &runningState{s} return s.h.start() @@ -92,11 +99,10 @@ func (s *notStartedState) Start() (upid.UPID, <-chan error) { /* -- stopped state */ -func (s *stoppedState) Send(ctx context.Context, msg *Message) error { return errTerminal } -func (s *stoppedState) Recv() (*Message, error) { return nil, errTerminal } -func (s *stoppedState) Inject(ctx context.Context, msg *Message) error { return errTerminal } -func (s *stoppedState) Stop(graceful bool) error { return errTerminal } -func (s *stoppedState) Install(messageName string) {} +func (s *stoppedState) Send(ctx context.Context, msg *Message) error { return errTerminal } +func (s *stoppedState) Recv() (*Message, error) { return nil, errTerminal } +func (s *stoppedState) Stop(graceful bool) error { return errTerminal } +func (s *stoppedState) Install(messageName string) {} func (s *stoppedState) Start() (upid.UPID, <-chan error) { ch := make(chan error, 1) ch <- errTerminal @@ -105,9 +111,8 @@ func (s *stoppedState) Start() (upid.UPID, <-chan error) { /* -- running state */ -func (s *runningState) Send(ctx context.Context, msg *Message) error { return s.h.send(ctx, msg) } -func (s *runningState) Recv() (*Message, error) { return s.h.recv() } -func (s *runningState) Inject(ctx context.Context, msg *Message) error { return s.h.inject(ctx, msg) } +func (s *runningState) Send(ctx context.Context, msg *Message) error { return s.h.send(ctx, msg) } +func (s *runningState) Recv() (*Message, error) { return s.h.recv() } func (s *runningState) Stop(graceful bool) error { s.h.state = &stoppedState{} return s.h.stop(graceful) @@ -118,6 +123,9 @@ func (s *runningState) Start() (upid.UPID, <-chan error) { return upid.UPID{}, ch } +// httpOpt is a functional option type +type httpOpt func(*HTTPTransporter) + // HTTPTransporter implements the interfaces of the Transporter. type HTTPTransporter struct { // If the host is empty("") then it will listen on localhost. @@ -132,118 +140,110 @@ type HTTPTransporter struct { shouldQuit chan struct{} stateLock sync.RWMutex // protect lifecycle (start/stop) funcs state httpTransporter + server *http.Server } // NewHTTPTransporter creates a new http transporter with an optional binding address. -func NewHTTPTransporter(upid upid.UPID, address net.IP) *HTTPTransporter { +func NewHTTPTransporter(upid upid.UPID, address net.IP, opts ...httpOpt) *HTTPTransporter { + transport := httpTransport + client := HttpClient + client.Transport = &transport + mux := http.NewServeMux() + result := &HTTPTransporter{ upid: upid, messageQueue: make(chan *Message, defaultQueueSize), - mux: http.NewServeMux(), - client: httpClient, - tr: httpTransport, + mux: mux, + client: &client, + tr: &transport, address: address, shouldQuit: make(chan struct{}), + server: &http.Server{ + ReadTimeout: ReadTimeout, + WriteTimeout: WriteTimeout, + Handler: mux, + }, + } + for _, f := range opts { + f(result) } result.state = ¬StartedState{result} return result } +func ServerTLSConfig(config *tls.Config, nextProto map[string]func(*http.Server, *tls.Conn, http.Handler)) httpOpt { + return func(transport *HTTPTransporter) { + transport.server.TLSConfig = config + transport.server.TLSNextProto = nextProto + } +} + +func ClientTLSConfig(config *tls.Config, handshakeTimeout time.Duration) httpOpt { + return func(transport *HTTPTransporter) { + transport.tr.TLSClientConfig = config + transport.tr.TLSHandshakeTimeout = handshakeTimeout + } +} + func (t *HTTPTransporter) getState() httpTransporter { t.stateLock.RLock() defer t.stateLock.RUnlock() return t.state } -// some network errors are probably recoverable, attempt to determine that here. -func isRecoverableError(err error) bool { - if urlErr, ok := err.(*url.Error); ok { - log.V(2).Infof("checking url.Error for recoverability") - return urlErr.Op == "Post" && isRecoverableError(urlErr.Err) - } else if netErr, ok := err.(*net.OpError); ok && netErr.Err != nil { - log.V(2).Infof("checking net.OpError for recoverability: %#v", err) - if netErr.Temporary() { - return true - } - //TODO(jdef) this is pretty hackish, there's probably a better way - //TODO(jdef) should also check for EHOSTDOWN and EHOSTUNREACH - return (netErr.Op == "dial" && netErr.Net == "tcp" && netErr.Err == syscall.ECONNREFUSED) - } - log.V(2).Infof("unrecoverable error: %#v", err) - return false -} - -type recoverableError struct { - Err error -} - -func (e *recoverableError) Error() string { - if e == nil { - return "" - } - return e.Err.Error() -} - // Send sends the message to its specified upid. func (t *HTTPTransporter) Send(ctx context.Context, msg *Message) (sendError error) { return t.getState().Send(ctx, msg) } +type mesosError struct { + errorCode int + upid string + uri string + status string +} + +func (e *mesosError) Error() string { + return fmt.Sprintf("master %s rejected %s, returned status %q", + e.upid, e.uri, e.status) +} + +type networkError struct { + cause error +} + +func (e *networkError) Error() string { + return e.cause.Error() +} + +// send delivers a message to a mesos component via HTTP, returns a mesosError if the +// communication with the remote process was successful but rejected. A networkError +// error indicates that communication with the remote process failed at the network layer. func (t *HTTPTransporter) send(ctx context.Context, msg *Message) (sendError error) { log.V(2).Infof("Sending message to %v via http\n", msg.UPID) req, err := t.makeLibprocessRequest(msg) if err != nil { - log.Errorf("Failed to make libprocess request: %v\n", err) return err } - duration := 1 * time.Second - for attempt := 0; attempt < 5; attempt++ { //TODO(jdef) extract/parameterize constant - if sendError != nil { - duration *= 2 - log.Warningf("attempting to recover from error '%v', waiting before retry: %v", sendError, duration) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(duration): - // ..retry request, continue - case <-t.shouldQuit: - return discardOnStopError - } - } - sendError = t.httpDo(ctx, req, func(resp *http.Response, err error) error { - if err != nil { - if isRecoverableError(err) { - return &recoverableError{Err: err} - } - log.Infof("Failed to POST: %v\n", err) - return err - } - defer resp.Body.Close() - // ensure master acknowledgement. - if (resp.StatusCode != http.StatusOK) && - (resp.StatusCode != http.StatusAccepted) { - msg := fmt.Sprintf("Master %s rejected %s. Returned status %s.", - msg.UPID, msg.RequestURI(), resp.Status) - log.Warning(msg) - return fmt.Errorf(msg) - } - return nil - }) - if sendError == nil { - // success - return - } else if _, ok := sendError.(*recoverableError); ok { - // recoverable, attempt backoff? - continue + return t.httpDo(ctx, req, func(resp *http.Response, err error) error { + if err != nil { + log.V(1).Infof("Failed to POST: %v\n", err) + return &networkError{err} } - // unrecoverable - break - } - if recoverable, ok := sendError.(*recoverableError); ok { - sendError = recoverable.Err - } - return + defer resp.Body.Close() + + // ensure master acknowledgement. + if (resp.StatusCode != http.StatusOK) && (resp.StatusCode != http.StatusAccepted) { + return &mesosError{ + errorCode: resp.StatusCode, + upid: msg.UPID.String(), + uri: msg.RequestURI(), + status: resp.Status, + } + } + return nil + }) } func (t *HTTPTransporter) httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error { @@ -289,30 +289,6 @@ func (t *HTTPTransporter) recv() (*Message, error) { return nil, discardOnStopError } -//Inject places a message into the incoming message queue. -func (t *HTTPTransporter) Inject(ctx context.Context, msg *Message) error { - return t.getState().Inject(ctx, msg) -} - -func (t *HTTPTransporter) inject(ctx context.Context, msg *Message) error { - select { - case <-ctx.Done(): - return ctx.Err() - case <-t.shouldQuit: - return discardOnStopError - default: // continue - } - - select { - case t.messageQueue <- msg: - return nil - case <-ctx.Done(): - return ctx.Err() - case <-t.shouldQuit: - return discardOnStopError - } -} - // Install the request URI according to the message's name. func (t *HTTPTransporter) Install(msgName string) { t.getState().Install(msgName) @@ -439,12 +415,7 @@ func (t *HTTPTransporter) start() (upid.UPID, <-chan error) { // TODO(yifan): Set read/write deadline. go func() { - s := &http.Server{ - ReadTimeout: DefaultReadTimeout, - WriteTimeout: DefaultWriteTimeout, - Handler: t.mux, - } - err := s.Serve(t.listener) + err := t.server.Serve(t.listener) select { case <-t.shouldQuit: log.V(1).Infof("HTTP server stopped because of shutdown") @@ -587,7 +558,7 @@ func (t *HTTPTransporter) makeLibprocessRequest(msg *Message) (*http.Request, er log.V(2).Infof("libproc target URL %s", targetURL) req, err := http.NewRequest("POST", targetURL, bytes.NewReader(msg.Bytes)) if err != nil { - log.Errorf("Failed to create request: %v\n", err) + log.V(1).Infof("Failed to create request: %v\n", err) return nil, err } if !msg.isV1API() { diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go index e275fda7569..066464b3c44 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go @@ -30,6 +30,7 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosproto/scheduler" "github.com/mesos/mesos-go/mesosutil/process" + "github.com/mesos/mesos-go/messenger/sessionid" "github.com/mesos/mesos-go/upid" "golang.org/x/net/context" ) @@ -52,16 +53,19 @@ type Messenger interface { UPID() upid.UPID } +type errorHandlerFunc func(context.Context, *Message, error) error +type dispatchFunc func(errorHandlerFunc) + // MesosMessenger is an implementation of the Messenger interface. type MesosMessenger struct { upid upid.UPID - encodingQueue chan *Message - sendingQueue chan *Message + sendingQueue chan dispatchFunc installedMessages map[string]reflect.Type installedHandlers map[string]MessageHandler stop chan struct{} stopOnce sync.Once tr Transporter + guardHandlers sync.RWMutex // protect simultaneous changes to messages/handlers maps } // ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to @@ -137,18 +141,17 @@ func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error) } // NewMesosMessenger creates a new mesos messenger. -func NewHttp(upid upid.UPID) *MesosMessenger { - return NewHttpWithBindingAddress(upid, nil) +func NewHttp(upid upid.UPID, opts ...httpOpt) *MesosMessenger { + return NewHttpWithBindingAddress(upid, nil, opts...) } -func NewHttpWithBindingAddress(upid upid.UPID, address net.IP) *MesosMessenger { - return New(upid, NewHTTPTransporter(upid, address)) +func NewHttpWithBindingAddress(upid upid.UPID, address net.IP, opts ...httpOpt) *MesosMessenger { + return New(NewHTTPTransporter(upid, address, opts...)) } -func New(upid upid.UPID, t Transporter) *MesosMessenger { +func New(t Transporter) *MesosMessenger { return &MesosMessenger{ - encodingQueue: make(chan *Message, defaultQueueSize), - sendingQueue: make(chan *Message, defaultQueueSize), + sendingQueue: make(chan dispatchFunc, defaultQueueSize), installedMessages: make(map[string]reflect.Type), installedHandlers: make(map[string]MessageHandler), tr: t, @@ -168,6 +171,10 @@ func (m *MesosMessenger) Install(handler MessageHandler, msg proto.Message) erro if _, ok := m.installedMessages[name]; ok { return fmt.Errorf("Message %v is already installed", name) } + + m.guardHandlers.Lock() + defer m.guardHandlers.Unlock() + m.installedMessages[name] = mtype.Elem() m.installedHandlers[name] = handler m.tr.Install(name) @@ -184,12 +191,27 @@ func (m *MesosMessenger) Send(ctx context.Context, upid *upid.UPID, msg proto.Me } else if *upid == m.upid { return fmt.Errorf("Send the message to self") } + + b, err := proto.Marshal(msg) + if err != nil { + return err + } + name := getMessageName(msg) log.V(2).Infof("Sending message %v to %v\n", name, upid) + + wrapped := &Message{upid, name, msg, b} + d := dispatchFunc(func(rf errorHandlerFunc) { + err := m.tr.Send(ctx, wrapped) + err = rf(ctx, wrapped, err) + if err != nil { + m.reportError("send", wrapped, err) + } + }) select { case <-ctx.Done(): return ctx.Err() - case m.encodingQueue <- &Message{upid, name, msg, nil}: + case m.sendingQueue <- d: return nil } } @@ -206,14 +228,18 @@ func (m *MesosMessenger) Route(ctx context.Context, upid *upid.UPID, msg proto.M return m.Send(ctx, upid, msg) } - // TODO(jdef) this has an unfortunate performance impact for self-messaging. implement - // something more reasonable here. - data, err := proto.Marshal(msg) - if err != nil { - return err - } name := getMessageName(msg) - return m.tr.Inject(ctx, &Message{upid, name, msg, data}) + log.V(2).Infof("routing message %q to self", name) + + _, handler, ok := m.messageBinding(name) + if !ok { + return fmt.Errorf("failed to route message, no message binding for %q", name) + } + + // the implication of this is that messages can be delivered to self even if the + // messenger has been stopped. is that OK? + go handler(upid, msg) + return nil } // Start starts the messenger; expects to be called once and only once. @@ -231,7 +257,6 @@ func (m *MesosMessenger) Start() error { m.upid = pid go m.sendLoop() - go m.encodeLoop() go m.decodeLoop() // wait for a listener error or a stop signal; either way stop the messenger @@ -267,7 +292,7 @@ func (m *MesosMessenger) Stop() (err error) { defer close(m.stop) } - log.Info("stopping messenger..") + log.Infof("stopping messenger %v..", m.upid) //TODO(jdef) don't hardcode the graceful flag here if err2 := m.tr.Stop(true); err2 != nil && err2 != errTerminal { @@ -283,53 +308,14 @@ func (m *MesosMessenger) UPID() upid.UPID { return m.upid } -func (m *MesosMessenger) encodeLoop() { - for { - select { - case <-m.stop: - return - case msg := <-m.encodingQueue: - e := func() error { - //TODO(jdef) implement timeout for context - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - b, err := proto.Marshal(msg.ProtoMessage) - if err != nil { - return err - } - msg.Bytes = b - select { - case <-ctx.Done(): - return ctx.Err() - case m.sendingQueue <- msg: - return nil - } - }() - if e != nil { - m.reportError(fmt.Errorf("Failed to enqueue message %v: %v", msg, e)) - } - } - } -} - -func (m *MesosMessenger) reportError(err error) { - log.V(2).Info(err) - //TODO(jdef) implement timeout for context - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - c := make(chan error, 1) - pid := m.upid - go func() { c <- m.Route(ctx, &pid, &mesos.FrameworkErrorMessage{Message: proto.String(err.Error())}) }() - select { - case <-ctx.Done(): - <-c // wait for Route to return - case e := <-c: - if e != nil { - log.Errorf("failed to report error %v due to: %v", err, e) - } +func (m *MesosMessenger) reportError(action string, msg *Message, err error) { + // log message transmission errors but don't shoot the messenger. + // this approach essentially drops all undelivered messages on the floor. + name := "" + if msg != nil { + name = msg.Name } + log.Errorf("failed to %s message %q: %+v", action, name, err) } func (m *MesosMessenger) sendLoop() { @@ -337,27 +323,28 @@ func (m *MesosMessenger) sendLoop() { select { case <-m.stop: return - case msg := <-m.sendingQueue: - e := func() error { - //TODO(jdef) implement timeout for context - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - c := make(chan error, 1) - go func() { c <- m.tr.Send(ctx, msg) }() - - select { - case <-ctx.Done(): - // Transport layer must use the context to detect cancelled requests. - <-c // wait for Send to return - return ctx.Err() - case err := <-c: - return err + case f := <-m.sendingQueue: + f(errorHandlerFunc(func(ctx context.Context, msg *Message, err error) error { + if _, ok := err.(*networkError); ok { + // if transport reports a network error, then + // we're probably disconnected from the remote process? + pid := msg.UPID.String() + neterr := &mesos.InternalNetworkError{Pid: &pid} + sessionID, ok := sessionid.FromContext(ctx) + if ok { + neterr.Session = &sessionID + } + log.V(1).Infof("routing network error for pid %q session %q", pid, sessionID) + err2 := m.Route(ctx, &m.upid, neterr) + if err2 != nil { + log.Error(err2) + } else { + log.V(1).Infof("swallowing raw error because we're reporting a networkError: %v", err) + return nil + } } - }() - if e != nil { - m.reportError(fmt.Errorf("Failed to send message %v: %v", msg.Name, e)) - } + return err + })) } } } @@ -379,17 +366,42 @@ func (m *MesosMessenger) decodeLoop() { panic(fmt.Sprintf("unexpected transport error: %v", err)) } } + log.V(2).Infof("Receiving message %v from %v\n", msg.Name, msg.UPID) - msg.ProtoMessage = reflect.New(m.installedMessages[msg.Name]).Interface().(proto.Message) + protoMessage, handler, found := m.messageBinding(msg.Name) + if !found { + log.Warningf("no message binding for message %q", msg.Name) + continue + } + + msg.ProtoMessage = protoMessage if err := proto.Unmarshal(msg.Bytes, msg.ProtoMessage); err != nil { log.Errorf("Failed to unmarshal message %v: %v\n", msg, err) continue } - // TODO(yifan): Catch panic. - m.installedHandlers[msg.Name](msg.UPID, msg.ProtoMessage) + + handler(msg.UPID, msg.ProtoMessage) } } +func (m *MesosMessenger) messageBinding(name string) (proto.Message, MessageHandler, bool) { + m.guardHandlers.RLock() + defer m.guardHandlers.RUnlock() + + gotype, ok := m.installedMessages[name] + if !ok { + return nil, nil, false + } + + handler, ok := m.installedHandlers[name] + if !ok { + return nil, nil, false + } + + protoMessage := reflect.New(gotype).Interface().(proto.Message) + return protoMessage, handler, true +} + // getMessageName returns the name of the message in the mesos manner. func getMessageName(msg proto.Message) string { var msgName string diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/sessionid/sessionid.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/sessionid/sessionid.go new file mode 100644 index 00000000000..ff48b7080a9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/sessionid/sessionid.go @@ -0,0 +1,18 @@ +package sessionid + +import ( + "golang.org/x/net/context" +) + +type key int + +const sessionIDKey = 0 + +func NewContext(ctx context.Context, sessionID string) context.Context { + return context.WithValue(ctx, sessionIDKey, sessionID) +} + +func FromContext(ctx context.Context) (string, bool) { + sessionID, ok := ctx.Value(sessionIDKey).(string) + return sessionID, ok +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/transporter.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/transporter.go index cfe80ec2158..efa1a10db6a 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/transporter.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/transporter.go @@ -33,11 +33,6 @@ type Transporter interface { //Will stop receiving when transport is stopped. Recv() (*Message, error) - //Inject injects a message to the incoming queue. Must use context to - //determine cancelled requests. Injection is aborted if the transport - //is stopped. - Inject(ctx context.Context, msg *Message) error - //Install mount an handler based on incoming message name. Install(messageName string) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go index f1a88f348f6..bc0ba660e17 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go @@ -38,19 +38,21 @@ import ( util "github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil/process" "github.com/mesos/mesos-go/messenger" + "github.com/mesos/mesos-go/messenger/sessionid" "github.com/mesos/mesos-go/upid" "github.com/pborman/uuid" "golang.org/x/net/context" ) const ( - authTimeout = 5 * time.Second // timeout interval for an authentication attempt + defaultAuthenticationTimeout = 30 * time.Second // timeout interval for an authentication attempt registrationRetryIntervalMax = float64(1 * time.Minute) registrationBackoffFactor = 2 * time.Second ) var ( - authenticationCanceledError = errors.New("authentication canceled") + ErrDisconnected = errors.New("disconnected from mesos master") + errAuthenticationCanceled = errors.New("authentication canceled") ) type ErrDriverAborted struct { @@ -147,7 +149,7 @@ type MesosSchedulerDriver struct { dispatch func(context.Context, *upid.UPID, proto.Message) error // send a message somewhere started chan struct{} // signal chan that closes upon a successful call to Start() eventLock sync.RWMutex // guard for all driver state - withScheduler func(f func(s Scheduler)) // execute some func with respect to the given scheduler + withScheduler func(f func(s Scheduler)) // execute some func with respect to the given scheduler; should be the last thing invoked in a handler (lock semantics) done chan struct{} // signal chan that closes when no more events will be processed } @@ -319,17 +321,27 @@ func (driver *MesosSchedulerDriver) makeWithScheduler(cs Scheduler) func(func(Sc } } +// ctx returns the current context.Context for the driver, expects to be invoked +// only when eventLock is locked. +func (driver *MesosSchedulerDriver) context() context.Context { + // set a "session" attribute so that the messenger can see it + // and use it for reporting delivery errors. + return sessionid.NewContext(context.TODO(), driver.connection.String()) +} + // init initializes the driver. func (driver *MesosSchedulerDriver) init() error { log.Infof("Initializing mesos scheduler driver\n") driver.dispatch = driver.messenger.Send // serialize all callbacks from the messenger - guarded := func(h messenger.MessageHandler) messenger.MessageHandler { + type messageHandler func(context.Context, *upid.UPID, proto.Message) + + guarded := func(h messageHandler) messenger.MessageHandler { return messenger.MessageHandler(func(from *upid.UPID, msg proto.Message) { driver.eventLock.Lock() defer driver.eventLock.Unlock() - h(from, msg) + h(driver.context(), from, msg) }) } @@ -345,11 +357,41 @@ func (driver *MesosSchedulerDriver) init() error { driver.messenger.Install(guarded(driver.exitedExecutor), &mesos.ExitedExecutorMessage{}) driver.messenger.Install(guarded(driver.handleMasterChanged), &mesos.InternalMasterChangeDetected{}) driver.messenger.Install(guarded(driver.handleAuthenticationResult), &mesos.InternalAuthenticationResult{}) + driver.messenger.Install(guarded(driver.handleNetworkError), &mesos.InternalNetworkError{}) return nil } +func (driver *MesosSchedulerDriver) handleNetworkError(_ context.Context, from *upid.UPID, pbMsg proto.Message) { + msg := pbMsg.(*mesos.InternalNetworkError) + + if driver.status == mesos.Status_DRIVER_ABORTED { + log.Info("ignoring network error because the driver is aborted.") + return + } else if !from.Equal(driver.self) { + log.Errorf("ignoring network error because message received from upid '%v'", from) + return + } else if !driver.connected { + log.V(1).Infof("ignoring network error since we're not currently connected") + return + } + + if driver.masterPid.String() == msg.GetPid() && driver.connection.String() == msg.GetSession() { + // fire a disconnection event + log.V(3).Info("Disconnecting scheduler.") + + // need to set all 3 of these at once, since withScheduler() temporarily releases the lock and we don't + // want inconsistent connection facts + driver.masterPid = nil + driver.connected = false + driver.authenticated = false + + driver.withScheduler(func(s Scheduler) { s.Disconnected(driver) }) + log.Info("master disconnected") + } +} + // lead master detection callback. -func (driver *MesosSchedulerDriver) handleMasterChanged(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) handleMasterChanged(_ context.Context, from *upid.UPID, pbMsg proto.Message) { if driver.status == mesos.Status_DRIVER_ABORTED { log.Info("Ignoring master change because the driver is aborted.") return @@ -359,18 +401,20 @@ func (driver *MesosSchedulerDriver) handleMasterChanged(from *upid.UPID, pbMsg p } // Reconnect every time a master is detected. - if driver.connected { + wasConnected := driver.connected + driver.connected = false + driver.authenticated = false + + alertScheduler := false + if wasConnected { log.V(3).Info("Disconnecting scheduler.") driver.masterPid = nil - driver.withScheduler(func(s Scheduler) { s.Disconnected(driver) }) + alertScheduler = true } msg := pbMsg.(*mesos.InternalMasterChangeDetected) master := msg.Master - driver.connected = false - driver.authenticated = false - if master != nil { log.Infof("New master %s detected\n", master.GetPid()) @@ -380,10 +424,13 @@ func (driver *MesosSchedulerDriver) handleMasterChanged(from *upid.UPID, pbMsg p } driver.masterPid = pid // save for downstream ops. - driver.tryAuthentication() + defer driver.tryAuthentication() } else { log.Infoln("No master detected.") } + if alertScheduler { + driver.withScheduler(func(s Scheduler) { s.Disconnected(driver) }) + } } // tryAuthentication expects to be guarded by eventLock @@ -439,7 +486,7 @@ func (driver *MesosSchedulerDriver) tryAuthentication() { } } -func (driver *MesosSchedulerDriver) handleAuthenticationResult(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) handleAuthenticationResult(_ context.Context, from *upid.UPID, pbMsg proto.Message) { if driver.status != mesos.Status_DRIVER_RUNNING { log.V(1).Info("ignoring authenticate because driver is not running") return @@ -506,7 +553,7 @@ func (driver *MesosSchedulerDriver) stopped() bool { } // ---------------------- Handlers for Events from Master --------------- // -func (driver *MesosSchedulerDriver) frameworkRegistered(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) frameworkRegistered(_ context.Context, from *upid.UPID, pbMsg proto.Message) { log.V(2).Infoln("Handling scheduler driver framework registered event.") msg := pbMsg.(*mesos.FrameworkRegisteredMessage) @@ -542,7 +589,7 @@ func (driver *MesosSchedulerDriver) frameworkRegistered(from *upid.UPID, pbMsg p driver.withScheduler(func(s Scheduler) { s.Registered(driver, frameworkId, masterInfo) }) } -func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) frameworkReregistered(_ context.Context, from *upid.UPID, pbMsg proto.Message) { log.V(1).Infoln("Handling Scheduler re-registered event.") msg := pbMsg.(*mesos.FrameworkReregisteredMessage) @@ -566,10 +613,9 @@ func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg driver.connection = uuid.NewUUID() driver.withScheduler(func(s Scheduler) { s.Reregistered(driver, msg.GetMasterInfo()) }) - } -func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) resourcesOffered(_ context.Context, from *upid.UPID, pbMsg proto.Message) { log.V(2).Infoln("Handling resource offers.") msg := pbMsg.(*mesos.ResourceOffersMessage) @@ -601,7 +647,7 @@ func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg prot driver.withScheduler(func(s Scheduler) { s.ResourceOffers(driver, msg.Offers) }) } -func (driver *MesosSchedulerDriver) resourceOfferRescinded(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) resourceOfferRescinded(_ context.Context, from *upid.UPID, pbMsg proto.Message) { log.V(1).Infoln("Handling resource offer rescinded.") msg := pbMsg.(*mesos.RescindResourceOfferMessage) @@ -623,11 +669,7 @@ func (driver *MesosSchedulerDriver) resourceOfferRescinded(from *upid.UPID, pbMs driver.withScheduler(func(s Scheduler) { s.OfferRescinded(driver, msg.OfferId) }) } -func (driver *MesosSchedulerDriver) send(upid *upid.UPID, msg proto.Message) error { - //TODO(jdef) should implement timeout here - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - +func (driver *MesosSchedulerDriver) send(ctx context.Context, upid *upid.UPID, msg proto.Message) error { c := make(chan error, 1) go func() { c <- driver.dispatch(ctx, upid, msg) }() @@ -641,7 +683,7 @@ func (driver *MesosSchedulerDriver) send(upid *upid.UPID, msg proto.Message) err } // statusUpdated expects to be guarded by eventLock -func (driver *MesosSchedulerDriver) statusUpdated(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) statusUpdated(ctx context.Context, from *upid.UPID, pbMsg proto.Message) { msg := pbMsg.(*mesos.StatusUpdateMessage) if driver.status != mesos.Status_DRIVER_RUNNING { @@ -679,35 +721,33 @@ func (driver *MesosSchedulerDriver) statusUpdated(from *upid.UPID, pbMsg proto.M status.Uuid = msg.Update.Uuid } - driver.withScheduler(func(s Scheduler) { s.StatusUpdate(driver, status) }) - if driver.status == mesos.Status_DRIVER_ABORTED { log.V(1).Infoln("Not sending StatusUpdate ACK, the driver is aborted!") - return - } - - // Send StatusUpdate Acknowledgement; see above for the rules. - // Only send ACK if udpate was not from this driver and spec'd a UUID; this is compat w/ 0.23+ - ackRequired := len(msg.Update.Uuid) > 0 && !from.Equal(driver.self) && msg.GetPid() != driver.self.String() - if ackRequired { - ackMsg := &mesos.StatusUpdateAcknowledgementMessage{ - SlaveId: msg.Update.SlaveId, - FrameworkId: driver.frameworkInfo.Id, - TaskId: msg.Update.Status.TaskId, - Uuid: msg.Update.Uuid, - } - - log.V(2).Infof("Sending ACK for status update %+v to %q", *msg.Update, from.String()) - if err := driver.send(driver.masterPid, ackMsg); err != nil { - log.Errorf("Failed to send StatusUpdate ACK message: %v", err) - return - } } else { - log.V(2).Infof("Not sending ACK, update is not from slave %q", from.String()) + + // Send StatusUpdate Acknowledgement; see above for the rules. + // Only send ACK if udpate was not from this driver and spec'd a UUID; this is compat w/ 0.23+ + ackRequired := len(msg.Update.Uuid) > 0 && !from.Equal(driver.self) && msg.GetPid() != driver.self.String() + if ackRequired { + ackMsg := &mesos.StatusUpdateAcknowledgementMessage{ + SlaveId: msg.Update.SlaveId, + FrameworkId: driver.frameworkInfo.Id, + TaskId: msg.Update.Status.TaskId, + Uuid: msg.Update.Uuid, + } + + log.V(2).Infof("Sending ACK for status update %+v to %q", *msg.Update, from.String()) + if err := driver.send(ctx, driver.masterPid, ackMsg); err != nil { + log.Errorf("Failed to send StatusUpdate ACK message: %v", err) + } + } else { + log.V(2).Infof("Not sending ACK, update is not from slave %q", from.String()) + } } + driver.withScheduler(func(s Scheduler) { s.StatusUpdate(driver, status) }) } -func (driver *MesosSchedulerDriver) exitedExecutor(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) exitedExecutor(_ context.Context, from *upid.UPID, pbMsg proto.Message) { log.V(1).Infoln("Handling ExitedExceutor event.") msg := pbMsg.(*mesos.ExitedExecutorMessage) @@ -728,7 +768,7 @@ func (driver *MesosSchedulerDriver) exitedExecutor(from *upid.UPID, pbMsg proto. driver.withScheduler(func(s Scheduler) { s.ExecutorLost(driver, msg.GetExecutorId(), msg.GetSlaveId(), int(status)) }) } -func (driver *MesosSchedulerDriver) slaveLost(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) slaveLost(_ context.Context, from *upid.UPID, pbMsg proto.Message) { log.V(1).Infoln("Handling LostSlave event.") msg := pbMsg.(*mesos.LostSlaveMessage) @@ -751,7 +791,7 @@ func (driver *MesosSchedulerDriver) slaveLost(from *upid.UPID, pbMsg proto.Messa driver.withScheduler(func(s Scheduler) { s.SlaveLost(driver, msg.SlaveId) }) } -func (driver *MesosSchedulerDriver) frameworkMessageRcvd(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) frameworkMessageRcvd(_ context.Context, from *upid.UPID, pbMsg proto.Message) { log.V(1).Infoln("Handling framework message event.") msg := pbMsg.(*mesos.ExecutorToFrameworkMessage) @@ -766,10 +806,10 @@ func (driver *MesosSchedulerDriver) frameworkMessageRcvd(from *upid.UPID, pbMsg driver.withScheduler(func(s Scheduler) { s.FrameworkMessage(driver, msg.ExecutorId, msg.SlaveId, string(msg.Data)) }) } -func (driver *MesosSchedulerDriver) frameworkErrorRcvd(from *upid.UPID, pbMsg proto.Message) { +func (driver *MesosSchedulerDriver) frameworkErrorRcvd(ctx context.Context, from *upid.UPID, pbMsg proto.Message) { log.V(1).Infoln("Handling framework error event.") msg := pbMsg.(*mesos.FrameworkErrorMessage) - driver.error(msg.GetMessage()) + driver.error(ctx, msg.GetMessage()) } // ---------------------- Interface Methods ---------------------- // @@ -828,7 +868,7 @@ func (driver *MesosSchedulerDriver) start() (mesos.Status, error) { // authenticate against the spec'd master pid using the configured authenticationProvider. // the authentication process is canceled upon either cancelation of authenticating, or -// else because it timed out (authTimeout). +// else because it timed out (see defaultAuthenticationTimeout, auth.Timeout). // // TODO(jdef) perhaps at some point in the future this will get pushed down into // the messenger layer (e.g. to use HTTP-based authentication). We'd probably still @@ -837,17 +877,28 @@ func (driver *MesosSchedulerDriver) start() (mesos.Status, error) { // func (driver *MesosSchedulerDriver) authenticate(pid *upid.UPID, authenticating *authenticationAttempt) error { log.Infof("authenticating with master %v", pid) - ctx, cancel := context.WithTimeout(context.Background(), authTimeout) - handler := &CredentialHandler{ - pid: pid, - client: driver.self, - credential: driver.credential, + + var ( + authTimeout = defaultAuthenticationTimeout + ctx = driver.withAuthContext(context.TODO()) + handler = &CredentialHandler{ + pid: pid, + client: driver.self, + credential: driver.credential, + } + ) + + // check for authentication timeout override + if d, ok := auth.TimeoutFrom(ctx); ok { + authTimeout = d } - ctx = driver.withAuthContext(ctx) + + ctx, cancel := context.WithTimeout(ctx, authTimeout) ctx = auth.WithParentUPID(ctx, *driver.self) ch := make(chan error, 1) go func() { ch <- auth.Login(ctx, handler) }() + select { case <-ctx.Done(): <-ch @@ -855,7 +906,7 @@ func (driver *MesosSchedulerDriver) authenticate(pid *upid.UPID, authenticating case <-authenticating.done: cancel() <-ch - return authenticationCanceledError + return errAuthenticationCanceled case e := <-ch: cancel() return e @@ -901,6 +952,7 @@ func (driver *MesosSchedulerDriver) registerOnce() bool { failover bool pid *upid.UPID info *mesos.FrameworkInfo + ctx context.Context ) if func() bool { driver.eventLock.RLock() @@ -914,6 +966,7 @@ func (driver *MesosSchedulerDriver) registerOnce() bool { failover = driver.failover pid = driver.masterPid info = proto.Clone(driver.frameworkInfo).(*mesos.FrameworkInfo) + ctx = driver.context() return true }() { // register framework @@ -931,7 +984,7 @@ func (driver *MesosSchedulerDriver) registerOnce() bool { Framework: info, } } - if err := driver.send(pid, message); err != nil { + if err := driver.send(ctx, pid, message); err != nil { log.Errorf("failed to send RegisterFramework message: %v", err) if _, err = driver.Stop(failover); err != nil { log.Errorf("failed to stop scheduler driver: %v", err) @@ -982,15 +1035,15 @@ waitForDeath: func (driver *MesosSchedulerDriver) Run() (mesos.Status, error) { driver.eventLock.Lock() defer driver.eventLock.Unlock() - return driver.run() + return driver.run(driver.context()) } // run expected to be guarded by eventLock -func (driver *MesosSchedulerDriver) run() (mesos.Status, error) { +func (driver *MesosSchedulerDriver) run(ctx context.Context) (mesos.Status, error) { stat, err := driver.start() if err != nil { - return driver.stop(err, false) + return driver.stop(ctx, err, false) } if stat != mesos.Status_DRIVER_RUNNING { @@ -1005,11 +1058,11 @@ func (driver *MesosSchedulerDriver) run() (mesos.Status, error) { func (driver *MesosSchedulerDriver) Stop(failover bool) (mesos.Status, error) { driver.eventLock.Lock() defer driver.eventLock.Unlock() - return driver.stop(nil, failover) + return driver.stop(driver.context(), nil, failover) } // stop expects to be guarded by eventLock -func (driver *MesosSchedulerDriver) stop(cause error, failover bool) (mesos.Status, error) { +func (driver *MesosSchedulerDriver) stop(ctx context.Context, cause error, failover bool) (mesos.Status, error) { log.Infoln("Stopping the scheduler driver") if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING { return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat) @@ -1024,7 +1077,7 @@ func (driver *MesosSchedulerDriver) stop(cause error, failover bool) (mesos.Stat //TODO(jdef) this is actually a little racy: we send an 'unregister' message but then // immediately afterward the messenger is stopped in driver._stop(). so the unregister message // may not actually end up being sent out. - if err := driver.send(driver.masterPid, message); err != nil { + if err := driver.send(ctx, driver.masterPid, message); err != nil { log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err) if cause == nil { cause = &ErrDriverAborted{} @@ -1048,17 +1101,23 @@ func (driver *MesosSchedulerDriver) _stop(cause error, stopStatus mesos.Status) default: } close(driver.stopCh) - if cause != nil { - log.V(1).Infof("Sending error via withScheduler: %v", cause) - driver.withScheduler(func(s Scheduler) { s.Error(driver, cause.Error()) }) - } else { - // send a noop func, withScheduler needs to see that stopCh is closed - log.V(1).Infof("Sending kill signal to withScheduler") - driver.withScheduler(func(_ Scheduler) {}) - } + // decouple to avoid deadlock (avoid nested withScheduler() invocations) + go func() { + driver.eventLock.Lock() + defer driver.eventLock.Unlock() + if cause != nil { + log.V(1).Infof("Sending error via withScheduler: %v", cause) + driver.withScheduler(func(s Scheduler) { s.Error(driver, cause.Error()) }) + } else { + // send a noop func, withScheduler needs to see that stopCh is closed + log.V(1).Infof("Sending kill signal to withScheduler") + driver.withScheduler(func(_ Scheduler) {}) + } + }() }() driver.status = stopStatus driver.connected = false + driver.connection = uuid.UUID{} log.Info("stopping messenger") err := driver.messenger.Stop() @@ -1070,11 +1129,11 @@ func (driver *MesosSchedulerDriver) _stop(cause error, stopStatus mesos.Status) func (driver *MesosSchedulerDriver) Abort() (stat mesos.Status, err error) { driver.eventLock.Lock() defer driver.eventLock.Unlock() - return driver.abort(nil) + return driver.abort(driver.context(), nil) } // abort expects to be guarded by eventLock -func (driver *MesosSchedulerDriver) abort(cause error) (stat mesos.Status, err error) { +func (driver *MesosSchedulerDriver) abort(ctx context.Context, cause error) (stat mesos.Status, err error) { if driver.masterDetector != nil { defer driver.masterDetector.Cancel() } @@ -1082,7 +1141,7 @@ func (driver *MesosSchedulerDriver) abort(cause error) (stat mesos.Status, err e log.Infof("Aborting framework [%+v]", driver.frameworkInfo.Id) if driver.connected { - _, err = driver.stop(cause, true) + _, err = driver.stop(ctx, cause, true) } else { driver._stop(cause, mesos.Status_DRIVER_ABORTED) } @@ -1100,13 +1159,21 @@ func (driver *MesosSchedulerDriver) AcceptOffers(offerIds []*mesos.OfferID, oper return stat, fmt.Errorf("Unable to AcceptOffers, expected driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat) } + ctx := driver.context() if !driver.connected { - err := fmt.Errorf("Not connected to master.") + err := ErrDisconnected for _, operation := range operations { if *operation.Type == mesos.Offer_Operation_LAUNCH { - for _, task := range operation.Launch.TaskInfos { - driver.pushLostTask(task, "Unable to launch tasks: "+err.Error()) - } + // decouple lost task processing to avoid deadlock (avoid nested withScheduler() invocations) + operation := operation + go func() { + driver.eventLock.Lock() + defer driver.eventLock.Unlock() + + for _, task := range operation.Launch.TaskInfos { + driver.pushLostTask(ctx, task, err.Error()) + } + }() } } log.Errorf("Failed to send LaunchTask message: %v\n", err) @@ -1188,12 +1255,19 @@ func (driver *MesosSchedulerDriver) AcceptOffers(offerIds []*mesos.OfferID, oper }, } - if err := driver.send(driver.masterPid, message); err != nil { + if err := driver.send(ctx, driver.masterPid, message); err != nil { for _, operation := range operations { if *operation.Type == mesos.Offer_Operation_LAUNCH { - for _, task := range operation.Launch.TaskInfos { - driver.pushLostTask(task, "Unable to launch tasks: "+err.Error()) - } + // decouple lost task processing to avoid deadlock (avoid nested withScheduler() invocations) + operation := operation + go func() { + driver.eventLock.Lock() + defer driver.eventLock.Unlock() + + for _, task := range operation.Launch.TaskInfos { + driver.pushLostTask(ctx, task, "Unable to launch tasks: "+err.Error()) + } + }() } } log.Errorf("Failed to send LaunchTask message: %v\n", err) @@ -1211,15 +1285,24 @@ func (driver *MesosSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, tasks return stat, fmt.Errorf("Unable to LaunchTasks, expected driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat) } + ctx := driver.context() + // Launch tasks if !driver.connected { log.Infoln("Ignoring LaunchTasks message, disconnected from master.") - // Send statusUpdate with status=TASK_LOST for each task. - // See sched.cpp L#823 - for _, task := range tasks { - driver.pushLostTask(task, "Master is disconnected") - } - return driver.status, fmt.Errorf("Not connected to master. Tasks marked as lost.") + // decouple lost task processing to avoid deadlock (avoid nested withScheduler() invocations) + err := ErrDisconnected + go func() { + driver.eventLock.Lock() + defer driver.eventLock.Unlock() + + // Send statusUpdate with status=TASK_LOST for each task. + // See sched.cpp L#823 + for _, task := range tasks { + driver.pushLostTask(ctx, task, err.Error()) + } + }() + return driver.status, err } okTasks := make([]*mesos.TaskInfo, 0, len(tasks)) @@ -1260,10 +1343,16 @@ func (driver *MesosSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, tasks Filters: filters, } - if err := driver.send(driver.masterPid, message); err != nil { - for _, task := range tasks { - driver.pushLostTask(task, "Unable to launch tasks: "+err.Error()) - } + if err := driver.send(ctx, driver.masterPid, message); err != nil { + // decouple lost task processing to avoid deadlock (avoid nested withScheduler() invocations) + go func() { + driver.eventLock.Lock() + defer driver.eventLock.Unlock() + + for _, task := range tasks { + driver.pushLostTask(ctx, task, "Unable to launch tasks: "+err.Error()) + } + }() log.Errorf("Failed to send LaunchTask message: %v\n", err) return driver.status, err } @@ -1272,7 +1361,7 @@ func (driver *MesosSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, tasks } // pushLostTask expects to be guarded by eventLock -func (driver *MesosSchedulerDriver) pushLostTask(taskInfo *mesos.TaskInfo, why string) { +func (driver *MesosSchedulerDriver) pushLostTask(ctx context.Context, taskInfo *mesos.TaskInfo, why string) { msg := &mesos.StatusUpdateMessage{ Update: &mesos.StatusUpdate{ FrameworkId: driver.frameworkInfo.Id, @@ -1292,7 +1381,7 @@ func (driver *MesosSchedulerDriver) pushLostTask(taskInfo *mesos.TaskInfo, why s // put it on internal chanel // will cause handler to push to attached Scheduler - driver.statusUpdated(driver.self, msg) + driver.statusUpdated(ctx, driver.self, msg) } func (driver *MesosSchedulerDriver) KillTask(taskId *mesos.TaskID) (mesos.Status, error) { @@ -1305,7 +1394,7 @@ func (driver *MesosSchedulerDriver) KillTask(taskId *mesos.TaskID) (mesos.Status if !driver.connected { log.Infoln("Ignoring kill task message, disconnected from master.") - return driver.status, fmt.Errorf("Not connected to master") + return driver.status, ErrDisconnected } message := &mesos.KillTaskMessage{ @@ -1313,7 +1402,7 @@ func (driver *MesosSchedulerDriver) KillTask(taskId *mesos.TaskID) (mesos.Status TaskId: taskId, } - if err := driver.send(driver.masterPid, message); err != nil { + if err := driver.send(driver.context(), driver.masterPid, message); err != nil { log.Errorf("Failed to send KillTask message: %v\n", err) return driver.status, err } @@ -1331,7 +1420,7 @@ func (driver *MesosSchedulerDriver) RequestResources(requests []*mesos.Request) if !driver.connected { log.Infoln("Ignoring request resource message, disconnected from master.") - return driver.status, fmt.Errorf("Not connected to master") + return driver.status, ErrDisconnected } message := &mesos.ResourceRequestMessage{ @@ -1339,7 +1428,7 @@ func (driver *MesosSchedulerDriver) RequestResources(requests []*mesos.Request) Requests: requests, } - if err := driver.send(driver.masterPid, message); err != nil { + if err := driver.send(driver.context(), driver.masterPid, message); err != nil { log.Errorf("Failed to send ResourceRequest message: %v\n", err) return driver.status, err } @@ -1362,13 +1451,13 @@ func (driver *MesosSchedulerDriver) ReviveOffers() (mesos.Status, error) { } if !driver.connected { log.Infoln("Ignoring revive offers message, disconnected from master.") - return driver.status, fmt.Errorf("Not connected to master.") + return driver.status, ErrDisconnected } message := &mesos.ReviveOffersMessage{ FrameworkId: driver.frameworkInfo.Id, } - if err := driver.send(driver.masterPid, message); err != nil { + if err := driver.send(driver.context(), driver.masterPid, message); err != nil { log.Errorf("Failed to send ReviveOffers message: %v\n", err) return driver.status, err } @@ -1385,7 +1474,7 @@ func (driver *MesosSchedulerDriver) SendFrameworkMessage(executorId *mesos.Execu } if !driver.connected { log.Infoln("Ignoring send framework message, disconnected from master.") - return driver.status, fmt.Errorf("Not connected to master") + return driver.status, ErrDisconnected } message := &mesos.FrameworkToExecutorMessage{ @@ -1401,13 +1490,13 @@ func (driver *MesosSchedulerDriver) SendFrameworkMessage(executorId *mesos.Execu if slavePid.Equal(driver.self) { return driver.status, nil } - if err := driver.send(slavePid, message); err != nil { + if err := driver.send(driver.context(), slavePid, message); err != nil { log.Errorf("Failed to send framework to executor message: %v\n", err) return driver.status, err } } else { // slavePid not cached, send to master. - if err := driver.send(driver.masterPid, message); err != nil { + if err := driver.send(driver.context(), driver.masterPid, message); err != nil { log.Errorf("Failed to send framework to executor message: %v\n", err) return driver.status, err } @@ -1425,14 +1514,14 @@ func (driver *MesosSchedulerDriver) ReconcileTasks(statuses []*mesos.TaskStatus) } if !driver.connected { log.Infoln("Ignoring send Reconcile Tasks message, disconnected from master.") - return driver.status, fmt.Errorf("Not connected to master.") + return driver.status, ErrDisconnected } message := &mesos.ReconcileTasksMessage{ FrameworkId: driver.frameworkInfo.Id, Statuses: statuses, } - if err := driver.send(driver.masterPid, message); err != nil { + if err := driver.send(driver.context(), driver.masterPid, message); err != nil { log.Errorf("Failed to send reconcile tasks message: %v\n", err) return driver.status, err } @@ -1441,10 +1530,10 @@ func (driver *MesosSchedulerDriver) ReconcileTasks(statuses []*mesos.TaskStatus) } // error expects to be guarded by eventLock -func (driver *MesosSchedulerDriver) error(err string) { +func (driver *MesosSchedulerDriver) error(ctx context.Context, err string) { if driver.status == mesos.Status_DRIVER_ABORTED { log.V(3).Infoln("Ignoring error message, the driver is aborted!") return } - driver.abort(&ErrDriverAborted{Reason: err}) + driver.abort(ctx, &ErrDriverAborted{Reason: err}) }