fixup godep

This commit is contained in:
deads2k 2016-09-28 15:29:31 -04:00
parent c0826a2e7e
commit 8b56981e0e
34 changed files with 0 additions and 13490 deletions

55
Godeps/Godeps.json generated
View File

@ -1503,31 +1503,6 @@
"ImportPath": "github.com/matttproud/golang_protobuf_extensions/pbutil",
"Rev": "fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a"
},
{
"ImportPath": "github.com/mesos/mesos-go/auth",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/auth/callback",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/auth/sasl",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/auth/sasl/mech",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/auth/sasl/mech/crammd5",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/detector",
"Comment": "before-0.26-protos-33-g45c8b08",
@ -1538,46 +1513,16 @@
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/executor",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/mesosproto",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/mesosproto/scheduler",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/mesosutil",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/mesosutil/process",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/messenger",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/messenger/sessionid",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/scheduler",
"Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
},
{
"ImportPath": "github.com/mesos/mesos-go/upid",
"Comment": "before-0.26-protos-33-g45c8b08",

View File

@ -1,28 +0,0 @@
package callback
import (
"fmt"
)
type Unsupported struct {
Callback Interface
}
func (uc *Unsupported) Error() string {
return fmt.Sprintf("Unsupported callback <%T>: %v", uc.Callback, uc.Callback)
}
type Interface interface {
// marker interface
}
type Handler interface {
// may return an Unsupported error on failure
Handle(callbacks ...Interface) error
}
type HandlerFunc func(callbacks ...Interface) error
func (f HandlerFunc) Handle(callbacks ...Interface) error {
return f(callbacks...)
}

View File

@ -1,27 +0,0 @@
package callback
import (
"github.com/mesos/mesos-go/upid"
)
type Interprocess struct {
client upid.UPID
server upid.UPID
}
func NewInterprocess() *Interprocess {
return &Interprocess{}
}
func (cb *Interprocess) Client() upid.UPID {
return cb.client
}
func (cb *Interprocess) Server() upid.UPID {
return cb.server
}
func (cb *Interprocess) Set(server, client upid.UPID) {
cb.server = server
cb.client = client
}

View File

@ -1,17 +0,0 @@
package callback
type Name struct {
name string
}
func NewName() *Name {
return &Name{}
}
func (cb *Name) Get() string {
return cb.name
}
func (cb *Name) Set(name string) {
cb.name = name
}

View File

@ -1,20 +0,0 @@
package callback
type Password struct {
password []byte
}
func NewPassword() *Password {
return &Password{}
}
func (cb *Password) Get() []byte {
clone := make([]byte, len(cb.password))
copy(clone, cb.password)
return clone
}
func (cb *Password) Set(password []byte) {
cb.password = make([]byte, len(password))
copy(cb.password, password)
}

View File

@ -1,63 +0,0 @@
package auth
import (
"errors"
"fmt"
"sync"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/auth/callback"
"golang.org/x/net/context"
)
// SPI interface: login provider implementations support this interface, clients
// do not authenticate against this directly, instead they should use Login()
type Authenticatee interface {
// Returns no errors if successfully authenticated, otherwise a single
// error.
Authenticate(ctx context.Context, handler callback.Handler) error
}
// Func adapter for interface: allow func's to implement the Authenticatee interface
// as long as the func signature matches
type AuthenticateeFunc func(ctx context.Context, handler callback.Handler) error
func (f AuthenticateeFunc) Authenticate(ctx context.Context, handler callback.Handler) error {
return f(ctx, handler)
}
var (
// Authentication was attempted and failed (likely due to incorrect credentials, too
// many retries within a time window, etc). Distinctly different from authentication
// errors (e.g. network errors, configuration errors, etc).
AuthenticationFailed = errors.New("authentication failed")
authenticateeProviders = make(map[string]Authenticatee) // authentication providers dict
providerLock sync.Mutex
)
// Register an authentication provider (aka "login provider"). packages that
// provide Authenticatee implementations should invoke this func in their
// init() to register.
func RegisterAuthenticateeProvider(name string, auth Authenticatee) (err error) {
providerLock.Lock()
defer providerLock.Unlock()
if _, found := authenticateeProviders[name]; found {
err = fmt.Errorf("authentication provider already registered: %v", name)
} else {
authenticateeProviders[name] = auth
log.V(1).Infof("registered authentication provider: %v", name)
}
return
}
// Look up an authentication provider by name, returns non-nil and true if such
// a provider is found.
func getAuthenticateeProvider(name string) (provider Authenticatee, ok bool) {
providerLock.Lock()
defer providerLock.Unlock()
provider, ok = authenticateeProviders[name]
return
}

View File

@ -1,100 +0,0 @@
package auth
import (
"errors"
"fmt"
"time"
"github.com/mesos/mesos-go/auth/callback"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
var (
// No login provider name has been specified in a context.Context
NoLoginProviderName = errors.New("missing login provider name in context")
)
// Main client entrypoint into the authentication APIs: clients are expected to
// invoke this func with a context containing a login provider name value.
// This may be written as:
// providerName := ... // the user has probably configured this via some flag
// handler := ... // handlers provide data like usernames and passwords
// ctx := ... // obtain some initial or timed context
// err := auth.Login(auth.WithLoginProvider(ctx, providerName), handler)
func Login(ctx context.Context, handler callback.Handler) error {
name, ok := LoginProviderFrom(ctx)
if !ok {
return NoLoginProviderName
}
provider, ok := getAuthenticateeProvider(name)
if !ok {
return fmt.Errorf("unrecognized login provider name in context: %s", name)
}
return provider.Authenticate(ctx, handler)
}
// Unexported key type, avoids conflicts with other context-using packages. All
// context items registered from this package should use keys of this type.
type loginKeyType int
const (
// name of login provider to use
loginProviderNameKey loginKeyType = iota
// upid.UPID of some parent process
parentUpidKey
// time.Duration that limits the overall duration of an auth attempt
timeoutKey
)
// Return a context that inherits all values from the parent ctx and specifies
// the login provider name given here. Intended to be invoked before calls to
// Login().
func WithLoginProvider(ctx context.Context, providerName string) context.Context {
return context.WithValue(ctx, loginProviderNameKey, providerName)
}
// Return the name of the login provider specified in this context.
func LoginProviderFrom(ctx context.Context) (name string, ok bool) {
name, ok = ctx.Value(loginProviderNameKey).(string)
return
}
// Return the name of the login provider specified in this context, or empty
// string if none.
func LoginProvider(ctx context.Context) string {
name, _ := LoginProviderFrom(ctx)
return name
}
func WithParentUPID(ctx context.Context, pid upid.UPID) context.Context {
return context.WithValue(ctx, parentUpidKey, pid)
}
func ParentUPIDFrom(ctx context.Context) (pid upid.UPID, ok bool) {
pid, ok = ctx.Value(parentUpidKey).(upid.UPID)
return
}
func ParentUPID(ctx context.Context) (upid *upid.UPID) {
if upid, ok := ParentUPIDFrom(ctx); ok {
return &upid
}
return nil
}
func TimeoutFrom(ctx context.Context) (d time.Duration, ok bool) {
d, ok = ctx.Value(timeoutKey).(time.Duration)
return
}
func Timeout(ctx context.Context) (d time.Duration) {
d, _ = TimeoutFrom(ctx)
return
}
func WithTimeout(ctx context.Context, d time.Duration) context.Context {
return context.WithValue(ctx, timeoutKey, d)
}

View File

@ -1,358 +0,0 @@
package sasl
import (
"errors"
"fmt"
"sync/atomic"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/auth"
"github.com/mesos/mesos-go/auth/callback"
"github.com/mesos/mesos-go/auth/sasl/mech"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil/process"
"github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
var (
UnexpectedAuthenticationMechanisms = errors.New("Unexpected authentication 'mechanisms' received")
UnexpectedAuthenticationStep = errors.New("Unexpected authentication 'step' received")
UnexpectedAuthenticationCompleted = errors.New("Unexpected authentication 'completed' received")
UnexpectedAuthenticatorPid = errors.New("Unexpected authentator pid") // authenticator pid changed mid-process
UnsupportedMechanism = errors.New("failed to identify a compatible mechanism")
)
type statusType int32
const (
statusReady statusType = iota
statusStarting
statusStepping
_statusTerminal // meta status, should never be assigned: all status types following are "terminal"
statusCompleted
statusFailed
statusError
statusDiscarded
// this login provider name is automatically registered with the auth package; see init()
ProviderName = "SASL"
)
type authenticateeProcess struct {
transport messenger.Messenger
client upid.UPID
status statusType
done chan struct{}
err error
mech mech.Interface
stepFn mech.StepFunc
from *upid.UPID
handler callback.Handler
}
type authenticateeConfig struct {
client upid.UPID // pid of the client we're attempting to authenticate
handler callback.Handler
transport messenger.Messenger // mesos communications transport
}
type transportFactory interface {
makeTransport() messenger.Messenger
}
type transportFactoryFunc func() messenger.Messenger
func (f transportFactoryFunc) makeTransport() messenger.Messenger {
return f()
}
func init() {
factory := func(ctx context.Context) transportFactoryFunc {
return transportFactoryFunc(func() messenger.Messenger {
parent := auth.ParentUPID(ctx)
if parent == nil {
log.Fatal("expected to have a parent UPID in context")
}
process := process.New("sasl_authenticatee")
tpid := upid.UPID{
ID: process.Label(),
Host: parent.Host,
}
return messenger.NewHttpWithBindingAddress(tpid, BindingAddressFrom(ctx))
})
}
delegate := auth.AuthenticateeFunc(func(ctx context.Context, handler callback.Handler) error {
if impl, err := makeAuthenticatee(handler, factory(ctx)); err != nil {
return err
} else {
return impl.Authenticate(ctx, handler)
}
})
if err := auth.RegisterAuthenticateeProvider(ProviderName, delegate); err != nil {
log.Error(err)
}
}
func (s *statusType) get() statusType {
return statusType(atomic.LoadInt32((*int32)(s)))
}
func (s *statusType) swap(old, new statusType) bool {
return old != new && atomic.CompareAndSwapInt32((*int32)(s), int32(old), int32(new))
}
// build a new authenticatee implementation using the given callbacks and a new transport instance
func makeAuthenticatee(handler callback.Handler, factory transportFactory) (auth.Authenticatee, error) {
ip := callback.NewInterprocess()
if err := handler.Handle(ip); err != nil {
return nil, err
}
config := &authenticateeConfig{
client: ip.Client(),
handler: handler,
transport: factory.makeTransport(),
}
return auth.AuthenticateeFunc(func(ctx context.Context, handler callback.Handler) error {
ctx, auth := newAuthenticatee(ctx, config)
auth.authenticate(ctx, ip.Server())
select {
case <-ctx.Done():
return auth.discard(ctx)
case <-auth.done:
return auth.err
}
}), nil
}
// Terminate the authentication process upon context cancellation;
// only to be called if/when ctx.Done() has been signalled.
func (self *authenticateeProcess) discard(ctx context.Context) error {
err := ctx.Err()
status := statusFrom(ctx)
for ; status < _statusTerminal; status = (&self.status).get() {
if self.terminate(status, statusDiscarded, err) {
break
}
}
return err
}
func newAuthenticatee(ctx context.Context, config *authenticateeConfig) (context.Context, *authenticateeProcess) {
initialStatus := statusReady
proc := &authenticateeProcess{
transport: config.transport,
client: config.client,
handler: config.handler,
status: initialStatus,
done: make(chan struct{}),
}
ctx = withStatus(ctx, initialStatus)
err := proc.installHandlers(ctx)
if err == nil {
err = proc.startTransport()
}
if err != nil {
proc.terminate(initialStatus, statusError, err)
}
return ctx, proc
}
func (self *authenticateeProcess) startTransport() error {
if err := self.transport.Start(); err != nil {
return err
} else {
go func() {
// stop the authentication transport upon termination of the
// authenticator process
select {
case <-self.done:
log.V(2).Infof("stopping authenticator transport: %v", self.transport.UPID())
self.transport.Stop()
}
}()
}
return nil
}
// returns true when handlers are installed without error, otherwise terminates the
// authentication process.
func (self *authenticateeProcess) installHandlers(ctx context.Context) error {
type handlerFn func(ctx context.Context, from *upid.UPID, pbMsg proto.Message)
withContext := func(f handlerFn) messenger.MessageHandler {
return func(from *upid.UPID, m proto.Message) {
status := (&self.status).get()
if self.from != nil && !self.from.Equal(from) {
self.terminate(status, statusError, UnexpectedAuthenticatorPid)
} else {
f(withStatus(ctx, status), from, m)
}
}
}
// Anticipate mechanisms and steps from the server
handlers := []struct {
f handlerFn
m proto.Message
}{
{self.mechanisms, &mesos.AuthenticationMechanismsMessage{}},
{self.step, &mesos.AuthenticationStepMessage{}},
{self.completed, &mesos.AuthenticationCompletedMessage{}},
{self.failed, &mesos.AuthenticationFailedMessage{}},
{self.errored, &mesos.AuthenticationErrorMessage{}},
}
for _, h := range handlers {
if err := self.transport.Install(withContext(h.f), h.m); err != nil {
return err
}
}
return nil
}
// return true if the authentication status was updated (if true, self.done will have been closed)
func (self *authenticateeProcess) terminate(old, new statusType, err error) bool {
if (&self.status).swap(old, new) {
self.err = err
if self.mech != nil {
self.mech.Discard()
}
close(self.done)
return true
}
return false
}
func (self *authenticateeProcess) authenticate(ctx context.Context, pid upid.UPID) {
status := statusFrom(ctx)
if status != statusReady {
return
}
message := &mesos.AuthenticateMessage{
Pid: proto.String(self.client.String()),
}
if err := self.transport.Send(ctx, &pid, message); err != nil {
self.terminate(status, statusError, err)
} else {
(&self.status).swap(status, statusStarting)
}
}
func (self *authenticateeProcess) mechanisms(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
status := statusFrom(ctx)
if status != statusStarting {
self.terminate(status, statusError, UnexpectedAuthenticationMechanisms)
return
}
msg, ok := pbMsg.(*mesos.AuthenticationMechanismsMessage)
if !ok {
self.terminate(status, statusError, fmt.Errorf("Expected AuthenticationMechanismsMessage, not %T", pbMsg))
return
}
mechanisms := msg.GetMechanisms()
log.Infof("Received SASL authentication mechanisms: %v", mechanisms)
selectedMech, factory := mech.SelectSupported(mechanisms)
if selectedMech == "" {
self.terminate(status, statusError, UnsupportedMechanism)
return
}
if m, f, err := factory(self.handler); err != nil {
self.terminate(status, statusError, err)
return
} else {
self.mech = m
self.stepFn = f
self.from = from
}
// execute initialization step...
nextf, data, err := self.stepFn(self.mech, nil)
if err != nil {
self.terminate(status, statusError, err)
return
} else {
self.stepFn = nextf
}
message := &mesos.AuthenticationStartMessage{
Mechanism: proto.String(selectedMech),
Data: data, // may be nil, depends on init step
}
if err := self.transport.Send(ctx, from, message); err != nil {
self.terminate(status, statusError, err)
} else {
(&self.status).swap(status, statusStepping)
}
}
func (self *authenticateeProcess) step(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
status := statusFrom(ctx)
if status != statusStepping {
self.terminate(status, statusError, UnexpectedAuthenticationStep)
return
}
log.Info("Received SASL authentication step")
msg, ok := pbMsg.(*mesos.AuthenticationStepMessage)
if !ok {
self.terminate(status, statusError, fmt.Errorf("Expected AuthenticationStepMessage, not %T", pbMsg))
return
}
input := msg.GetData()
fn, output, err := self.stepFn(self.mech, input)
if err != nil {
self.terminate(status, statusError, fmt.Errorf("failed to perform authentication step: %v", err))
return
}
self.stepFn = fn
// We don't start the client with SASL_SUCCESS_DATA so we may
// need to send one more "empty" message to the server.
message := &mesos.AuthenticationStepMessage{}
if len(output) > 0 {
message.Data = output
}
if err := self.transport.Send(ctx, from, message); err != nil {
self.terminate(status, statusError, err)
}
}
func (self *authenticateeProcess) completed(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
status := statusFrom(ctx)
if status != statusStepping {
self.terminate(status, statusError, UnexpectedAuthenticationCompleted)
return
}
log.Info("Authentication success")
self.terminate(status, statusCompleted, nil)
}
func (self *authenticateeProcess) failed(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
status := statusFrom(ctx)
self.terminate(status, statusFailed, auth.AuthenticationFailed)
}
func (self *authenticateeProcess) errored(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
var err error
if msg, ok := pbMsg.(*mesos.AuthenticationErrorMessage); !ok {
err = fmt.Errorf("Expected AuthenticationErrorMessage, not %T", pbMsg)
} else {
err = fmt.Errorf("Authentication error: %s", msg.GetError())
}
status := statusFrom(ctx)
self.terminate(status, statusError, err)
}

View File

@ -1,43 +0,0 @@
package sasl
import (
"net"
"golang.org/x/net/context"
)
// unexported to prevent collisions with context keys defined in
// other packages.
type _key int
// If this package defined other context keys, they would have
// different integer values.
const (
statusKey _key = iota
bindingAddressKey // bind address for login-related network ops
)
func withStatus(ctx context.Context, s statusType) context.Context {
return context.WithValue(ctx, statusKey, s)
}
func statusFrom(ctx context.Context) statusType {
s, ok := ctx.Value(statusKey).(statusType)
if !ok {
panic("missing status in context")
}
return s
}
func WithBindingAddress(ctx context.Context, address net.IP) context.Context {
return context.WithValue(ctx, bindingAddressKey, address)
}
func BindingAddressFrom(ctx context.Context) net.IP {
obj := ctx.Value(bindingAddressKey)
if addr, ok := obj.(net.IP); ok {
return addr
} else {
return nil
}
}

View File

@ -1,72 +0,0 @@
package crammd5
import (
"crypto/hmac"
"crypto/md5"
"encoding/hex"
"errors"
"io"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/auth/callback"
"github.com/mesos/mesos-go/auth/sasl/mech"
)
var (
Name = "CRAM-MD5" // name this mechanism is registered with
//TODO(jdef) is this a generic SASL error? if so, move it up to mech
challengeDataRequired = errors.New("challenge data may not be empty")
)
func init() {
mech.Register(Name, newInstance)
}
type mechanism struct {
handler callback.Handler
}
func (m *mechanism) Handler() callback.Handler {
return m.handler
}
func (m *mechanism) Discard() {
// noop
}
func newInstance(h callback.Handler) (mech.Interface, mech.StepFunc, error) {
m := &mechanism{
handler: h,
}
fn := func(m mech.Interface, data []byte) (mech.StepFunc, []byte, error) {
// noop: no initialization needed
return challengeResponse, nil, nil
}
return m, fn, nil
}
// algorithm lifted from wikipedia: http://en.wikipedia.org/wiki/CRAM-MD5
// except that the SASL mechanism used by Mesos doesn't leverage base64 encoding
func challengeResponse(m mech.Interface, data []byte) (mech.StepFunc, []byte, error) {
if len(data) == 0 {
return mech.IllegalState, nil, challengeDataRequired
}
decoded := string(data)
log.V(4).Infof("challenge(decoded): %s", decoded) // for deep debugging only
username := callback.NewName()
secret := callback.NewPassword()
if err := m.Handler().Handle(username, secret); err != nil {
return mech.IllegalState, nil, err
}
hash := hmac.New(md5.New, secret.Get())
if _, err := io.WriteString(hash, decoded); err != nil {
return mech.IllegalState, nil, err
}
codes := hex.EncodeToString(hash.Sum(nil))
msg := username.Get() + " " + codes
return nil, []byte(msg), nil
}

View File

@ -1,33 +0,0 @@
package mech
import (
"errors"
"github.com/mesos/mesos-go/auth/callback"
)
var (
IllegalStateErr = errors.New("illegal mechanism state")
)
type Interface interface {
Handler() callback.Handler
Discard() // clean up resources or sensitive information; idempotent
}
// return a mechanism and it's initialization step (may be a noop that returns
// a nil data blob and handle to the first "real" challenge step).
type Factory func(h callback.Handler) (Interface, StepFunc, error)
// StepFunc implementations should never return a nil StepFunc result. This
// helps keep the logic in the SASL authticatee simpler: step functions are
// never nil. Mechanisms that end up an error state (for example, some decoding
// logic fails...) should return a StepFunc that represents an error state.
// Some mechanisms may be able to recover from such.
type StepFunc func(m Interface, data []byte) (StepFunc, []byte, error)
// reflects an unrecoverable, illegal mechanism state; always returns IllegalState
// as the next step along with an IllegalStateErr
func IllegalState(m Interface, data []byte) (StepFunc, []byte, error) {
return IllegalState, nil, IllegalStateErr
}

View File

@ -1,49 +0,0 @@
package mech
import (
"fmt"
"sync"
log "github.com/golang/glog"
)
var (
mechLock sync.Mutex
supportedMechs = make(map[string]Factory)
)
func Register(name string, f Factory) error {
mechLock.Lock()
defer mechLock.Unlock()
if _, found := supportedMechs[name]; found {
return fmt.Errorf("Mechanism registered twice: %s", name)
}
supportedMechs[name] = f
log.V(1).Infof("Registered mechanism %s", name)
return nil
}
func ListSupported() (list []string) {
mechLock.Lock()
defer mechLock.Unlock()
for mechname := range supportedMechs {
list = append(list, mechname)
}
return list
}
func SelectSupported(mechanisms []string) (selectedMech string, factory Factory) {
mechLock.Lock()
defer mechLock.Unlock()
for _, m := range mechanisms {
if f, ok := supportedMechs[m]; ok {
selectedMech = m
factory = f
break
}
}
return
}

View File

@ -1,5 +0,0 @@
/*
Package executor includes the interfaces of the mesos executor and
the mesos executor driver, as well as an implementation of the driver.
*/
package executor

View File

@ -1,142 +0,0 @@
package executor
import (
"github.com/mesos/mesos-go/mesosproto"
)
/**
* Executor callback interface to be implemented by frameworks' executors. Note
* that only one callback will be invoked at a time, so it is not
* recommended that you block within a callback because it may cause a
* deadlock.
*
* Each callback includes an instance to the executor driver that was
* used to run this executor. The driver will not change for the
* duration of an executor (i.e., from the point you do
* ExecutorDriver.Start() to the point that ExecutorDriver.Join()
* returns). This is intended for convenience so that an executor
* doesn't need to store a pointer to the driver itself.
*/
type Executor interface {
/**
* Invoked once the executor driver has been able to successfully
* connect with Mesos. In particular, a scheduler can pass some
* data to its executors through the FrameworkInfo.ExecutorInfo's
* data field.
*/
Registered(ExecutorDriver, *mesosproto.ExecutorInfo, *mesosproto.FrameworkInfo, *mesosproto.SlaveInfo)
/**
* Invoked when the executor re-registers with a restarted slave.
*/
Reregistered(ExecutorDriver, *mesosproto.SlaveInfo)
/**
* Invoked when the executor becomes "disconnected" from the slave
* (e.g., the slave is being restarted due to an upgrade).
*/
Disconnected(ExecutorDriver)
/**
* Invoked when a task has been launched on this executor (initiated
* via SchedulerDriver.LaunchTasks). Note that this task can be realized
* with a goroutine, an external process, or some simple computation, however,
* no other callbacks will be invoked on this executor until this
* callback has returned.
*/
LaunchTask(ExecutorDriver, *mesosproto.TaskInfo)
/**
* Invoked when a task running within this executor has been killed
* (via SchedulerDriver.KillTask). Note that no status update will
* be sent on behalf of the executor, the executor is responsible
* for creating a new TaskStatus (i.e., with TASK_KILLED) and
* invoking ExecutorDriver.SendStatusUpdate.
*/
KillTask(ExecutorDriver, *mesosproto.TaskID)
/**
* Invoked when a framework message has arrived for this
* executor. These messages are best effort; do not expect a
* framework message to be retransmitted in any reliable fashion.
*/
FrameworkMessage(ExecutorDriver, string)
/**
* Invoked when the executor should terminate all of its currently
* running tasks. Note that after Mesos has determined that an
* executor has terminated, any tasks that the executor did not send
* terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED,
* TASK_FAILED, etc) a TASK_LOST status update will be created.
*/
Shutdown(ExecutorDriver)
/**
* Invoked when a fatal error has occured with the executor and/or
* executor driver. The driver will be aborted BEFORE invoking this
* callback.
*/
Error(ExecutorDriver, string)
}
/**
* ExecutorDriver interface for connecting an executor to Mesos. This
* interface is used both to manage the executor's lifecycle (start
* it, stop it, or wait for it to finish) and to interact with Mesos
* (e.g., send status updates, send framework messages, etc.).
* A driver method is expected to fail-fast and return an error when possible.
* Other internal errors (or remote error) that occur asynchronously are handled
* using the the Executor.Error() callback.
*/
type ExecutorDriver interface {
/**
* Starts the executor driver. This needs to be called before any
* other driver calls are made.
*/
Start() (mesosproto.Status, error)
/**
* Stops the executor driver.
*/
Stop() (mesosproto.Status, error)
/**
* Aborts the driver so that no more callbacks can be made to the
* executor. The semantics of abort and stop have deliberately been
* separated so that code can detect an aborted driver (i.e., via
* the return status of ExecutorDriver.Join, see below), and
* instantiate and start another driver if desired (from within the
* same process ... although this functionality is currently not
* supported for executors).
*/
Abort() (mesosproto.Status, error)
/**
* Waits for the driver to be stopped or aborted, possibly
* blocking the calling goroutine indefinitely. The return status of
* this function can be used to determine if the driver was aborted
* (see package mesosproto for a description of Status).
*/
Join() (mesosproto.Status, error)
/**
* Starts and immediately joins (i.e., blocks on) the driver.
*/
Run() (mesosproto.Status, error)
/**
* Sends a status update to the framework scheduler, retrying as
* necessary until an acknowledgement has been received or the
* executor is terminated (in which case, a TASK_LOST status update
* will be sent). See Scheduler.StatusUpdate for more information
* about status update acknowledgements.
*/
SendStatusUpdate(*mesosproto.TaskStatus) (mesosproto.Status, error)
/**
* Sends a message to the framework scheduler. These messages are
* best effort; do not expect a framework message to be
* retransmitted in any reliable fashion.
*/
SendFrameworkMessage(string) (mesosproto.Status, error)
}

View File

@ -1,750 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package executor
import (
"fmt"
"net"
"os"
"sync"
"time"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"github.com/mesos/mesos-go/mesosutil/process"
"github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/messenger/sessionid"
"github.com/mesos/mesos-go/upid"
"github.com/pborman/uuid"
"golang.org/x/net/context"
)
const (
defaultRecoveryTimeout = 15 * time.Minute
)
type DriverConfig struct {
Executor Executor
HostnameOverride string // optional
BindingAddress net.IP // optional
BindingPort uint16 // optional
PublishedAddress net.IP // optional
NewMessenger func() (messenger.Messenger, error) // optional
}
// MesosExecutorDriver is a implementation of the ExecutorDriver.
type MesosExecutorDriver struct {
lock sync.RWMutex
cond *sync.Cond
self *upid.UPID
stopCh chan struct{}
status mesosproto.Status
messenger messenger.Messenger
slaveUPID *upid.UPID
slaveID *mesosproto.SlaveID
frameworkID *mesosproto.FrameworkID
executorID *mesosproto.ExecutorID
workDir string
connected bool
connection uuid.UUID
local bool // TODO(yifan): Not used yet.
directory string // TODO(yifan): Not used yet.
checkpoint bool
recoveryTimeout time.Duration
recoveryTimer *time.Timer
updates map[string]*mesosproto.StatusUpdate // Key is a UUID string. TODO(yifan): Not used yet.
tasks map[string]*mesosproto.TaskInfo // Key is a UUID string. TODO(yifan): Not used yet.
withExecutor func(f func(e Executor))
started chan struct{} // signal chan that closes once start has been invoked
}
// NewMesosExecutorDriver creates a new mesos executor driver.
func NewMesosExecutorDriver(config DriverConfig) (*MesosExecutorDriver, error) {
if config.Executor == nil {
msg := "Executor callback interface cannot be nil."
log.Errorln(msg)
return nil, fmt.Errorf(msg)
}
hostname := mesosutil.GetHostname(config.HostnameOverride)
newMessenger := config.NewMessenger
if newMessenger == nil {
newMessenger = func() (messenger.Messenger, error) {
process := process.New("executor")
return messenger.ForHostname(process, hostname, config.BindingAddress, config.BindingPort, config.PublishedAddress)
}
}
driver := &MesosExecutorDriver{
status: mesosproto.Status_DRIVER_NOT_STARTED,
stopCh: make(chan struct{}),
updates: make(map[string]*mesosproto.StatusUpdate),
tasks: make(map[string]*mesosproto.TaskInfo),
workDir: ".",
started: make(chan struct{}),
recoveryTimeout: defaultRecoveryTimeout,
}
driver.cond = sync.NewCond(&driver.lock)
// decouple serialized executor callback execution from goroutines of this driver
var execLock sync.Mutex
driver.withExecutor = func(f func(e Executor)) {
go func() {
execLock.Lock()
defer execLock.Unlock()
f(config.Executor)
}()
}
var err error
if driver.messenger, err = newMessenger(); err != nil {
return nil, err
}
if err = driver.init(); err != nil {
log.Errorf("failed to initialize the driver: %v", err)
return nil, err
}
return driver, nil
}
// context returns the driver context, expects driver.lock to be locked
func (driver *MesosExecutorDriver) context() context.Context {
return sessionid.NewContext(context.TODO(), driver.connection.String())
}
// init initializes the driver.
func (driver *MesosExecutorDriver) init() error {
log.Infof("Init mesos executor driver\n")
log.Infof("Protocol Version: %v\n", mesosutil.MesosVersion)
// Parse environments.
if err := driver.parseEnviroments(); err != nil {
log.Errorf("Failed to parse environments: %v\n", err)
return err
}
type messageHandler func(context.Context, *upid.UPID, proto.Message)
guard := func(h messageHandler) messenger.MessageHandler {
return messenger.MessageHandler(func(from *upid.UPID, pbMsg proto.Message) {
driver.lock.Lock()
defer driver.lock.Unlock()
h(driver.context(), from, pbMsg)
})
}
// Install handlers.
driver.messenger.Install(guard(driver.registered), &mesosproto.ExecutorRegisteredMessage{})
driver.messenger.Install(guard(driver.reregistered), &mesosproto.ExecutorReregisteredMessage{})
driver.messenger.Install(guard(driver.reconnect), &mesosproto.ReconnectExecutorMessage{})
driver.messenger.Install(guard(driver.runTask), &mesosproto.RunTaskMessage{})
driver.messenger.Install(guard(driver.killTask), &mesosproto.KillTaskMessage{})
driver.messenger.Install(guard(driver.statusUpdateAcknowledgement), &mesosproto.StatusUpdateAcknowledgementMessage{})
driver.messenger.Install(guard(driver.frameworkMessage), &mesosproto.FrameworkToExecutorMessage{})
driver.messenger.Install(guard(driver.shutdown), &mesosproto.ShutdownExecutorMessage{})
driver.messenger.Install(guard(driver.frameworkError), &mesosproto.FrameworkErrorMessage{})
driver.messenger.Install(guard(driver.networkError), &mesosproto.InternalNetworkError{})
return nil
}
func (driver *MesosExecutorDriver) parseEnviroments() error {
var value string
value = os.Getenv("MESOS_LOCAL")
if len(value) > 0 {
driver.local = true
}
value = os.Getenv("MESOS_SLAVE_PID")
if len(value) == 0 {
return fmt.Errorf("Cannot find MESOS_SLAVE_PID in the environment")
}
upid, err := upid.Parse(value)
if err != nil {
log.Errorf("Cannot parse UPID %v\n", err)
return err
}
driver.slaveUPID = upid
value = os.Getenv("MESOS_SLAVE_ID")
driver.slaveID = &mesosproto.SlaveID{Value: proto.String(value)}
value = os.Getenv("MESOS_FRAMEWORK_ID")
driver.frameworkID = &mesosproto.FrameworkID{Value: proto.String(value)}
value = os.Getenv("MESOS_EXECUTOR_ID")
driver.executorID = &mesosproto.ExecutorID{Value: proto.String(value)}
value = os.Getenv("MESOS_DIRECTORY")
if len(value) > 0 {
driver.workDir = value
}
value = os.Getenv("MESOS_CHECKPOINT")
if value == "1" {
driver.checkpoint = true
}
/*
if driver.checkpoint {
value = os.Getenv("MESOS_RECOVERY_TIMEOUT")
}
// TODO(yifan): Parse the duration. For now just use default.
*/
return nil
}
// ------------------------- Accessors ----------------------- //
func (driver *MesosExecutorDriver) Status() mesosproto.Status {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.status
}
func (driver *MesosExecutorDriver) Running() bool {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.status == mesosproto.Status_DRIVER_RUNNING
}
func (driver *MesosExecutorDriver) stopped() bool {
return driver.status != mesosproto.Status_DRIVER_RUNNING
}
func (driver *MesosExecutorDriver) Connected() bool {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.connected
}
// --------------------- Message Handlers --------------------- //
// networkError is invoked when there's a network-level error communicating with the mesos slave.
// The driver reacts by entering a "disconnected" state and invoking the Executor.Disconnected
// callback. The assumption is that if this driver was previously connected, and then there's a
// network error, then the slave process must be dying/dead. The native driver implementation makes
// this same assumption. I have some concerns that this may be a false-positive in some situations;
// some network errors (timeouts) may be indicative of something other than a dead slave process.
func (driver *MesosExecutorDriver) networkError(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Info("ignoring network error because aborted")
return
}
if driver.connected {
driver.connected = false
msg := pbMsg.(*mesosproto.InternalNetworkError)
session := msg.GetSession()
if session != driver.connection.String() {
log.V(1).Infoln("ignoring netwok error for disconnected/stale session")
return
}
if driver.checkpoint {
log.Infoln("slave disconnected, will wait for recovery")
driver.withExecutor(func(e Executor) { e.Disconnected(driver) })
if driver.recoveryTimer != nil {
driver.recoveryTimer.Stop()
}
t := time.NewTimer(driver.recoveryTimeout)
driver.recoveryTimer = t
go func() {
select {
case <-t.C:
// timer expired
driver.lock.Lock()
defer driver.lock.Unlock()
driver.recoveryTimedOut(session)
case <-driver.stopCh:
// driver stopped
return
}
}()
return
}
}
log.Infoln("slave exited ... shutting down")
driver.withExecutor(func(e Executor) { e.Shutdown(driver) }) // abnormal shutdown
driver.abort()
}
func (driver *MesosExecutorDriver) recoveryTimedOut(connection string) {
if driver.connected {
return
}
// ensure that connection ID's match otherwise we've been re-registered
if connection == driver.connection.String() {
log.Info("recovery timeout of %v exceeded; shutting down", driver.recoveryTimeout)
driver.shutdown(driver.context(), nil, nil)
}
}
func (driver *MesosExecutorDriver) registered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring registration message from slave because aborted")
return
}
log.Infoln("Executor driver registered")
msg := pbMsg.(*mesosproto.ExecutorRegisteredMessage)
slaveID := msg.GetSlaveId()
executorInfo := msg.GetExecutorInfo()
frameworkInfo := msg.GetFrameworkInfo()
slaveInfo := msg.GetSlaveInfo()
if driver.stopped() {
log.Infof("Ignoring registered message from slave %v, because the driver is stopped!\n", slaveID)
return
}
log.Infof("Registered on slave %v\n", slaveID)
driver.connected = true
driver.connection = uuid.NewUUID()
driver.cond.Broadcast() // useful for testing
driver.withExecutor(func(e Executor) { e.Registered(driver, executorInfo, frameworkInfo, slaveInfo) })
}
func (driver *MesosExecutorDriver) reregistered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring reregistration message from slave because aborted")
return
}
log.Infoln("Executor driver reregistered")
msg := pbMsg.(*mesosproto.ExecutorReregisteredMessage)
slaveID := msg.GetSlaveId()
slaveInfo := msg.GetSlaveInfo()
if driver.stopped() {
log.Infof("Ignoring re-registered message from slave %v, because the driver is stopped!\n", slaveID)
return
}
log.Infof("Re-registered on slave %v\n", slaveID)
driver.connected = true
driver.connection = uuid.NewUUID()
driver.cond.Broadcast() // useful for testing
driver.withExecutor(func(e Executor) { e.Reregistered(driver, slaveInfo) })
}
func (driver *MesosExecutorDriver) send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
c := make(chan error, 1)
go func() { c <- driver.messenger.Send(ctx, upid, msg) }()
select {
case <-ctx.Done():
<-c // wait for Send(...)
return ctx.Err()
case err := <-c:
return err
}
}
func (driver *MesosExecutorDriver) reconnect(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring reconnect message from slave because aborted")
return
}
log.Infoln("Executor driver reconnect")
msg := pbMsg.(*mesosproto.ReconnectExecutorMessage)
slaveID := msg.GetSlaveId()
if driver.stopped() {
log.Infof("Ignoring reconnect message from slave %v, because the driver is stopped!\n", slaveID)
return
}
log.Infof("Received reconnect request from slave %v\n", slaveID)
driver.slaveUPID = from
message := &mesosproto.ReregisterExecutorMessage{
ExecutorId: driver.executorID,
FrameworkId: driver.frameworkID,
}
// Send all unacknowledged updates.
for _, u := range driver.updates {
message.Updates = append(message.Updates, u)
}
// Send all unacknowledged tasks.
for _, t := range driver.tasks {
message.Tasks = append(message.Tasks, t)
}
// Send the message.
if err := driver.send(ctx, driver.slaveUPID, message); err != nil {
log.Errorf("Failed to send %v: %v\n", message, err)
}
}
func (driver *MesosExecutorDriver) runTask(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring runTask message from slave because aborted")
return
}
log.Infoln("Executor driver runTask")
msg := pbMsg.(*mesosproto.RunTaskMessage)
task := msg.GetTask()
taskID := task.GetTaskId()
if driver.stopped() {
log.Infof("Ignoring run task message for task %v because the driver is stopped!\n", taskID)
return
}
if _, ok := driver.tasks[taskID.String()]; ok {
log.Fatalf("Unexpected duplicate task %v\n", taskID)
}
log.Infof("Executor asked to run task '%v'\n", taskID)
driver.tasks[taskID.String()] = task
driver.withExecutor(func(e Executor) { e.LaunchTask(driver, task) })
}
func (driver *MesosExecutorDriver) killTask(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring killTask message from slave because aborted")
return
}
log.Infoln("Executor driver killTask")
msg := pbMsg.(*mesosproto.KillTaskMessage)
taskID := msg.GetTaskId()
if driver.stopped() {
log.Infof("Ignoring kill task message for task %v, because the driver is stopped!\n", taskID)
return
}
log.Infof("Executor driver is asked to kill task '%v'\n", taskID)
driver.withExecutor(func(e Executor) { e.KillTask(driver, taskID) })
}
func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring status update ack message because aborted")
return
}
log.Infoln("Executor statusUpdateAcknowledgement")
msg := pbMsg.(*mesosproto.StatusUpdateAcknowledgementMessage)
log.Infof("Receiving status update acknowledgement %v", msg)
frameworkID := msg.GetFrameworkId()
taskID := msg.GetTaskId()
uuid := uuid.UUID(msg.GetUuid())
if driver.stopped() {
log.Infof("Ignoring status update acknowledgement %v for task %v of framework %v because the driver is stopped!\n",
uuid, taskID, frameworkID)
}
// Remove the corresponding update.
delete(driver.updates, uuid.String())
// Remove the corresponding task.
delete(driver.tasks, taskID.String())
}
func (driver *MesosExecutorDriver) frameworkMessage(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring frameworkMessage message from slave because aborted")
return
}
log.Infoln("Executor driver received frameworkMessage")
msg := pbMsg.(*mesosproto.FrameworkToExecutorMessage)
data := msg.GetData()
if driver.stopped() {
log.Infof("Ignoring framework message because the driver is stopped!\n")
return
}
log.Infof("Executor driver receives framework message\n")
driver.withExecutor(func(e Executor) { e.FrameworkMessage(driver, string(data)) })
}
func (driver *MesosExecutorDriver) shutdown(_ context.Context, _ *upid.UPID, _ proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring shutdown message because aborted")
return
}
log.Infoln("Executor driver received shutdown")
if driver.stopped() {
log.Infof("Ignoring shutdown message because the driver is stopped!\n")
return
}
log.Infof("Executor driver is asked to shutdown\n")
driver.withExecutor(func(e Executor) { e.Shutdown(driver) })
// driver.stop() will cause process to eventually stop.
driver.stop()
}
func (driver *MesosExecutorDriver) frameworkError(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesosproto.Status_DRIVER_ABORTED {
log.V(1).Infof("ignoring framework error message because aborted")
return
}
log.Infoln("Executor driver received error")
msg := pbMsg.(*mesosproto.FrameworkErrorMessage)
driver.withExecutor(func(e Executor) { e.Error(driver, msg.GetMessage()) })
}
// ------------------------ Driver Implementation ----------------- //
// Start starts the executor driver
func (driver *MesosExecutorDriver) Start() (mesosproto.Status, error) {
driver.lock.Lock()
defer driver.lock.Unlock()
return driver.start()
}
func (driver *MesosExecutorDriver) start() (mesosproto.Status, error) {
log.Infoln("Starting the executor driver")
if driver.status != mesosproto.Status_DRIVER_NOT_STARTED {
return driver.status, fmt.Errorf("Unable to Start, expecting status %s, but got %s", mesosproto.Status_DRIVER_NOT_STARTED, driver.status)
}
// Start the messenger.
if err := driver.messenger.Start(); err != nil {
log.Errorf("Failed to start executor: %v\n", err)
return driver.status, err
}
pid := driver.messenger.UPID()
driver.self = &pid
// Register with slave.
log.V(3).Infoln("Sending Executor registration")
message := &mesosproto.RegisterExecutorMessage{
FrameworkId: driver.frameworkID,
ExecutorId: driver.executorID,
}
if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorf("Stopping the executor, failed to send %v: %v\n", message, err)
err0 := driver._stop(driver.status)
if err0 != nil {
log.Errorf("Failed to stop executor: %v\n", err)
return driver.status, err0
}
return driver.status, err
}
driver.status = mesosproto.Status_DRIVER_RUNNING
close(driver.started)
log.Infoln("Mesos executor is started with PID=", driver.self.String())
return driver.status, nil
}
// Stop stops the driver by sending a 'stopEvent' to the event loop, and
// receives the result from the response channel.
func (driver *MesosExecutorDriver) Stop() (mesosproto.Status, error) {
driver.lock.Lock()
defer driver.lock.Unlock()
return driver.stop()
}
func (driver *MesosExecutorDriver) stop() (mesosproto.Status, error) {
log.Infoln("Stopping the executor driver")
if driver.status != mesosproto.Status_DRIVER_RUNNING {
return driver.status, fmt.Errorf("Unable to Stop, expecting status %s, but got %s", mesosproto.Status_DRIVER_RUNNING, driver.status)
}
return mesosproto.Status_DRIVER_STOPPED, driver._stop(mesosproto.Status_DRIVER_STOPPED)
}
// internal function for stopping the driver and set reason for stopping
// Note that messages inflight or queued will not be processed.
func (driver *MesosExecutorDriver) _stop(stopStatus mesosproto.Status) error {
err := driver.messenger.Stop()
defer func() {
select {
case <-driver.stopCh:
// already closed
default:
close(driver.stopCh)
}
driver.cond.Broadcast()
}()
driver.status = stopStatus
if err != nil {
return err
}
return nil
}
// Abort aborts the driver by sending an 'abortEvent' to the event loop, and
// receives the result from the response channel.
func (driver *MesosExecutorDriver) Abort() (mesosproto.Status, error) {
driver.lock.Lock()
defer driver.lock.Unlock()
return driver.abort()
}
func (driver *MesosExecutorDriver) abort() (mesosproto.Status, error) {
if driver.status != mesosproto.Status_DRIVER_RUNNING {
return driver.status, fmt.Errorf("Unable to Stop, expecting status %s, but got %s", mesosproto.Status_DRIVER_RUNNING, driver.status)
}
log.Infoln("Aborting the executor driver")
return mesosproto.Status_DRIVER_ABORTED, driver._stop(mesosproto.Status_DRIVER_ABORTED)
}
// Join waits for the driver by sending a 'joinEvent' to the event loop, and wait
// on a channel for the notification of driver termination.
func (driver *MesosExecutorDriver) Join() (mesosproto.Status, error) {
driver.lock.Lock()
defer driver.lock.Unlock()
return driver.join()
}
func (driver *MesosExecutorDriver) join() (mesosproto.Status, error) {
log.Infoln("Waiting for the executor driver to stop")
if driver.status != mesosproto.Status_DRIVER_RUNNING {
return driver.status, fmt.Errorf("Unable to Join, expecting status %s, but got %s", mesosproto.Status_DRIVER_RUNNING, driver.status)
}
for {
select {
case <-driver.stopCh: // wait for stop signal
return driver.status, nil
default:
driver.cond.Wait()
}
}
}
// Run starts the driver and calls Join() to wait for stop request.
func (driver *MesosExecutorDriver) Run() (mesosproto.Status, error) {
driver.lock.Lock()
defer driver.lock.Unlock()
return driver.run()
}
func (driver *MesosExecutorDriver) run() (mesosproto.Status, error) {
stat, err := driver.start()
if err != nil {
return driver.stop()
}
if stat != mesosproto.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to continue to Run, expecting status %s, but got %s", mesosproto.Status_DRIVER_RUNNING, driver.status)
}
return driver.join()
}
// SendStatusUpdate sends status updates to the slave.
func (driver *MesosExecutorDriver) SendStatusUpdate(taskStatus *mesosproto.TaskStatus) (mesosproto.Status, error) {
driver.lock.Lock()
defer driver.lock.Unlock()
return driver.sendStatusUpdate(taskStatus)
}
func (driver *MesosExecutorDriver) sendStatusUpdate(taskStatus *mesosproto.TaskStatus) (mesosproto.Status, error) {
log.V(3).Infoln("Sending task status update: ", taskStatus.String())
if driver.status != mesosproto.Status_DRIVER_RUNNING {
return driver.status, fmt.Errorf("Unable to SendStatusUpdate, expecting driver.status %s, but got %s", mesosproto.Status_DRIVER_RUNNING, driver.status)
}
if taskStatus.GetState() == mesosproto.TaskState_TASK_STAGING {
err := fmt.Errorf("Executor is not allowed to send TASK_STAGING status update. Aborting!")
log.Errorln(err)
if err0 := driver._stop(mesosproto.Status_DRIVER_ABORTED); err0 != nil {
log.Errorln("Error while stopping the driver", err0)
}
return driver.status, err
}
// Set up status update.
update := driver.makeStatusUpdate(taskStatus)
log.Infof("Executor sending status update %v\n", update.String())
// Capture the status update.
driver.updates[uuid.UUID(update.GetUuid()).String()] = update
// Put the status update in the message.
message := &mesosproto.StatusUpdateMessage{
Update: update,
Pid: proto.String(driver.self.String()),
}
// Send the message.
if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorf("Failed to send %v: %v\n", message, err)
return driver.status, err
}
return driver.status, nil
}
func (driver *MesosExecutorDriver) makeStatusUpdate(taskStatus *mesosproto.TaskStatus) *mesosproto.StatusUpdate {
now := float64(time.Now().Unix())
// Fill in all the fields.
taskStatus.Timestamp = proto.Float64(now)
taskStatus.SlaveId = driver.slaveID
update := &mesosproto.StatusUpdate{
FrameworkId: driver.frameworkID,
ExecutorId: driver.executorID,
SlaveId: driver.slaveID,
Status: taskStatus,
Timestamp: proto.Float64(now),
Uuid: uuid.NewUUID(),
}
return update
}
// SendFrameworkMessage sends the framework message by sending a 'sendFrameworkMessageEvent'
// to the event loop, and receives the result from the response channel.
func (driver *MesosExecutorDriver) SendFrameworkMessage(data string) (mesosproto.Status, error) {
driver.lock.Lock()
defer driver.lock.Unlock()
return driver.sendFrameworkMessage(data)
}
func (driver *MesosExecutorDriver) sendFrameworkMessage(data string) (mesosproto.Status, error) {
log.V(3).Infoln("Sending framework message", string(data))
if driver.status != mesosproto.Status_DRIVER_RUNNING {
return driver.status, fmt.Errorf("Unable to SendFrameworkMessage, expecting status %s, but got %s", mesosproto.Status_DRIVER_RUNNING, driver.status)
}
message := &mesosproto.ExecutorToFrameworkMessage{
SlaveId: driver.slaveID,
FrameworkId: driver.frameworkID,
ExecutorId: driver.executorID,
Data: []byte(data),
}
// Send the message.
if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorln("Failed to send message %v: %v", message, err)
return driver.status, err
}
return driver.status, nil
}

View File

@ -1,42 +0,0 @@
package executor
import (
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
type TestDriver struct {
*MesosExecutorDriver
}
func (e *TestDriver) SetConnected(b bool) {
e.guarded(func() {
e.connected = b
})
}
func (e *TestDriver) SetMessenger(m messenger.Messenger) {
e.messenger = m
}
func (e *TestDriver) Started() <-chan struct{} {
return e.started
}
func (e *TestDriver) guarded(f func()) {
e.lock.Lock()
defer e.lock.Unlock()
f()
}
func (e *TestDriver) Context() context.Context {
return e.context()
}
func (e *TestDriver) StatusUpdateAcknowledgement(ctx context.Context, from *upid.UPID, msg proto.Message) {
e.guarded(func() {
e.statusUpdateAcknowledgement(ctx, from, msg)
})
}

File diff suppressed because it is too large Load Diff

View File

@ -1,347 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mesosproto.scheduler;
import "github.com/mesos/mesos-go/mesosproto/mesos.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option go_package = "scheduler";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
/**
* Scheduler event API.
*
* An event is described using the standard protocol buffer "union"
* trick, see:
* https://developers.google.com/protocol-buffers/docs/techniques#union.
*/
message Event {
// Possible event types, followed by message definitions if
// applicable.
enum Type {
SUBSCRIBED = 1; // See 'Subscribed' below.
OFFERS = 2; // See 'Offers' below.
RESCIND = 3; // See 'Rescind' below.
UPDATE = 4; // See 'Update' below.
MESSAGE = 5; // See 'Message' below.
FAILURE = 6; // See 'Failure' below.
ERROR = 7; // See 'Error' below.
// Periodic message sent by the Mesos master according to
// 'Subscribed.heartbeat_interval_seconds'. If the scheduler does
// not receive any events (including heartbeats) for an extended
// period of time (e.g., 5 x heartbeat_interval_seconds), there is
// likely a network partition. In such a case the scheduler should
// close the existing subscription connection and resubscribe
// using a backoff strategy.
HEARTBEAT = 8;
}
// First event received when the scheduler subscribes.
message Subscribed {
required FrameworkID framework_id = 1;
// This value will be set if the master is sending heartbeats. See
// the comment above on 'HEARTBEAT' for more details.
optional double heartbeat_interval_seconds = 2;
}
// Received whenever there are new resources that are offered to the
// scheduler or resources requested back from the scheduler. Each
// offer corresponds to a set of resources on a slave. Until the
// scheduler accepts or declines an offer the resources are
// considered allocated to the scheduler. Accepting or Declining an
// inverse offer informs the allocator of the scheduler's ability to
// release the resources without violating an SLA.
message Offers {
repeated Offer offers = 1;
repeated InverseOffer inverse_offers = 2;
}
// Received when a particular offer is no longer valid (e.g., the
// slave corresponding to the offer has been removed) and hence
// needs to be rescinded. Any future calls ('Accept' / 'Decline') made
// by the scheduler regarding this offer will be invalid.
message Rescind {
required OfferID offer_id = 1;
}
// Received whenever there is a status update that is generated by
// the executor or slave or master. Status updates should be used by
// executors to reliably communicate the status of the tasks that
// they manage. It is crucial that a terminal update (see TaskState
// in mesos.proto) is sent by the executor as soon as the task
// terminates, in order for Mesos to release the resources allocated
// to the task. It is also the responsibility of the scheduler to
// explicitly acknowledge the receipt of a status update. See
// 'Acknowledge' in the 'Call' section below for the semantics.
message Update {
required TaskStatus status = 1;
}
// Received when a custom message generated by the executor is
// forwarded by the master. Note that this message is not
// interpreted by Mesos and is only forwarded (without reliability
// guarantees) to the scheduler. It is up to the executor to retry
// if the message is dropped for any reason.
message Message {
required SlaveID slave_id = 1;
required ExecutorID executor_id = 2;
required bytes data = 3;
}
// Received when a slave is removed from the cluster (e.g., failed
// health checks) or when an executor is terminated. Note that, this
// event coincides with receipt of terminal UPDATE events for any
// active tasks belonging to the slave or executor and receipt of
// 'Rescind' events for any outstanding offers belonging to the
// slave. Note that there is no guaranteed order between the
// 'Failure', 'Update' and 'Rescind' events when a slave or executor
// is removed.
// TODO(vinod): Consider splitting the lost slave and terminated
// executor into separate events and ensure it's reliably generated.
message Failure {
optional SlaveID slave_id = 1;
// If this was just a failure of an executor on a slave then
// 'executor_id' will be set and possibly 'status' (if we were
// able to determine the exit status).
optional ExecutorID executor_id = 2;
optional int32 status = 3;
}
// Received when an invalid framework (e.g., unauthenticated,
// unauthorized) attempts to subscribe with the master. Error can
// also be received if scheduler sends invalid Calls (e.g., not
// properly initialized).
// TODO(vinod): Remove this once the old scheduler driver is no
// longer supported. With HTTP API all errors will be signaled via
// HTTP response codes.
message Error {
required string message = 1;
}
// Type of the event, indicates which optional field below should be
// present if that type has a nested message definition.
required Type type = 1;
optional Subscribed subscribed = 2;
optional Offers offers = 3;
optional Rescind rescind = 4;
optional Update update = 5;
optional Message message = 6;
optional Failure failure = 7;
optional Error error = 8;
}
/**
* Scheduler call API.
*
* Like Event, a Call is described using the standard protocol buffer
* "union" trick (see above).
*/
message Call {
// Possible call types, followed by message definitions if
// applicable.
enum Type {
SUBSCRIBE = 1; // See 'Subscribe' below.
TEARDOWN = 2; // Shuts down all tasks/executors and removes framework.
ACCEPT = 3; // See 'Accept' below.
DECLINE = 4; // See 'Decline' below.
REVIVE = 5; // Removes any previous filters set via ACCEPT or DECLINE.
KILL = 6; // See 'Kill' below.
SHUTDOWN = 7; // See 'Shutdown' below.
ACKNOWLEDGE = 8; // See 'Acknowledge' below.
RECONCILE = 9; // See 'Reconcile' below.
MESSAGE = 10; // See 'Message' below.
REQUEST = 11; // See 'Request' below.
SUPPRESS = 12; // Inform master to stop sending offers to the framework.
// TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
// already subscribed frameworks as a way of stopping offers from
// being generated and other events from being sent by the master.
// Note that this functionality existed originally to support
// SchedulerDriver::abort which was only necessary to handle
// exceptions getting thrown from within Scheduler callbacks,
// something that is not an issue with the Event/Call API.
}
// Subscribes the scheduler with the master to receive events. A
// scheduler must send other calls only after it has received the
// SUBCRIBED event.
message Subscribe {
// See the comments below on 'framework_id' on the semantics for
// 'framework_info.id'.
required FrameworkInfo framework_info = 1;
// 'force' field is only relevant when 'framework_info.id' is set.
// It tells the master what to do in case an instance of the
// scheduler attempts to subscribe when another instance of it is
// already connected (e.g., split brain due to network partition).
// If 'force' is true, this scheduler instance is allowed and the
// old connected scheduler instance is disconnected. If false,
// this scheduler instance is disallowed subscription in favor of
// the already connected scheduler instance.
//
// It is recommended to set this to true only when a newly elected
// scheduler instance is attempting to subscribe but not when a
// scheduler is retrying subscription (e.g., disconnection or
// master failover; see sched/sched.cpp for an example).
optional bool force = 2;
}
// Accepts an offer, performing the specified operations
// in a sequential manner.
//
// E.g. Launch a task with a newly reserved persistent volume:
//
// Accept {
// offer_ids: [ ... ]
// operations: [
// { type: RESERVE,
// reserve: { resources: [ disk(role):2 ] } }
// { type: CREATE,
// create: { volumes: [ disk(role):1+persistence ] } }
// { type: LAUNCH,
// launch: { task_infos ... disk(role):1;disk(role):1+persistence } }
// ]
// }
//
// Note that any of the offers resources not used in the 'Accept'
// call (e.g., to launch a task) are considered unused and might be
// reoffered to other frameworks. In other words, the same OfferID
// cannot be used in more than one 'Accept' call.
message Accept {
repeated OfferID offer_ids = 1;
repeated Offer.Operation operations = 2;
optional Filters filters = 3;
}
// Declines an offer, signaling the master to potentially reoffer
// the resources to a different framework. Note that this is same
// as sending an Accept call with no operations. See comments on
// top of 'Accept' for semantics.
message Decline {
repeated OfferID offer_ids = 1;
optional Filters filters = 2;
}
// Kills a specific task. If the scheduler has a custom executor,
// the kill is forwarded to the executor and it is up to the
// executor to kill the task and send a TASK_KILLED (or TASK_FAILED)
// update. Note that Mesos releases the resources for a task once it
// receives a terminal update (See TaskState in mesos.proto) for it.
// If the task is unknown to the master, a TASK_LOST update is
// generated.
message Kill {
required TaskID task_id = 1;
optional SlaveID slave_id = 2;
}
// Shuts down a custom executor. When the executor gets a shutdown
// event, it is expected to kill all its tasks (and send TASK_KILLED
// updates) and terminate. If the executor doesnt terminate within
// a certain timeout (configurable via
// '--executor_shutdown_grace_period' slave flag), the slave will
// forcefully destroy the container (executor and its tasks) and
// transition its active tasks to TASK_LOST.
message Shutdown {
required ExecutorID executor_id = 1;
required SlaveID slave_id = 2;
}
// Acknowledges the receipt of status update. Schedulers are
// responsible for explicitly acknowledging the receipt of status
// updates that have 'Update.status().uuid()' field set. Such status
// updates are retried by the slave until they are acknowledged by
// the scheduler.
message Acknowledge {
required SlaveID slave_id = 1;
required TaskID task_id = 2;
required bytes uuid = 3;
}
// Allows the scheduler to query the status for non-terminal tasks.
// This causes the master to send back the latest task status for
// each task in 'tasks', if possible. Tasks that are no longer known
// will result in a TASK_LOST update. If 'statuses' is empty, then
// the master will send the latest status for each task currently
// known.
message Reconcile {
// TODO(vinod): Support arbitrary queries than just state of tasks.
message Task {
required TaskID task_id = 1;
optional SlaveID slave_id = 2;
}
repeated Task tasks = 1;
}
// Sends arbitrary binary data to the executor. Note that Mesos
// neither interprets this data nor makes any guarantees about the
// delivery of this message to the executor.
message Message {
required SlaveID slave_id = 1;
required ExecutorID executor_id = 2;
required bytes data = 3;
}
// Requests a specific set of resources from Mesos's allocator. If
// the allocator has support for this, corresponding offers will be
// sent asynchronously via the OFFERS event(s).
//
// NOTE: The built-in hierarchical allocator doesn't have support
// for this call and hence simply ignores it.
message Request {
repeated mesosproto.Request requests = 1;
}
// Identifies who generated this call. Master assigns a framework id
// when a new scheduler subscribes for the first time. Once assigned,
// the scheduler must set the 'framework_id' here and within its
// FrameworkInfo (in any further 'Subscribe' calls). This allows the
// master to identify a scheduler correctly across disconnections,
// failovers, etc.
optional FrameworkID framework_id = 1;
// Type of the call, indicates which optional field below should be
// present if that type has a nested message definition.
required Type type = 2;
optional Subscribe subscribe = 3;
optional Accept accept = 4;
optional Decline decline = 5;
optional Kill kill = 6;
optional Shutdown shutdown = 7;
optional Acknowledge acknowledge = 8;
optional Reconcile reconcile = 9;
optional Message message = 10;
optional Request request = 11;
}

View File

@ -1,34 +0,0 @@
package process
import (
"fmt"
"sync"
)
var (
pidLock sync.Mutex
pid uint64
)
func nextPid() uint64 {
pidLock.Lock()
defer pidLock.Unlock()
pid++
return pid
}
//TODO(jdef) add lifecycle funcs
//TODO(jdef) add messaging funcs
type Process struct {
label string
}
func New(kind string) *Process {
return &Process{
label: fmt.Sprintf("%s(%d)", kind, nextPid()),
}
}
func (p *Process) Label() string {
return p.label
}

View File

@ -1,39 +0,0 @@
####Benchmark of the messenger.
```shell
$ go test -v -run=Benckmark* -bench=.
PASS
BenchmarkMessengerSendSmallMessage 50000 70568 ns/op
BenchmarkMessengerSendMediumMessage 50000 70265 ns/op
BenchmarkMessengerSendBigMessage 50000 72693 ns/op
BenchmarkMessengerSendLargeMessage 50000 72896 ns/op
BenchmarkMessengerSendMixedMessage 50000 72631 ns/op
BenchmarkMessengerSendRecvSmallMessage 20000 78409 ns/op
BenchmarkMessengerSendRecvMediumMessage 20000 80471 ns/op
BenchmarkMessengerSendRecvBigMessage 20000 82629 ns/op
BenchmarkMessengerSendRecvLargeMessage 20000 85987 ns/op
BenchmarkMessengerSendRecvMixedMessage 20000 83678 ns/op
ok github.com/mesos/mesos-go/messenger 115.135s
$ go test -v -run=Benckmark* -bench=. -cpu=4 -send-routines=4 2>/dev/null
PASS
BenchmarkMessengerSendSmallMessage-4 50000 35529 ns/op
BenchmarkMessengerSendMediumMessage-4 50000 35997 ns/op
BenchmarkMessengerSendBigMessage-4 50000 36871 ns/op
BenchmarkMessengerSendLargeMessage-4 50000 37310 ns/op
BenchmarkMessengerSendMixedMessage-4 50000 37419 ns/op
BenchmarkMessengerSendRecvSmallMessage-4 50000 39320 ns/op
BenchmarkMessengerSendRecvMediumMessage-4 50000 41990 ns/op
BenchmarkMessengerSendRecvBigMessage-4 50000 42157 ns/op
BenchmarkMessengerSendRecvLargeMessage-4 50000 45472 ns/op
BenchmarkMessengerSendRecvMixedMessage-4 50000 47393 ns/op
ok github.com/mesos/mesos-go/messenger 105.173s
```
####environment:
```
OS: Linux yifan-laptop 3.13.0-32-generic #57-Ubuntu SMP Tue Jul 15 03:51:08 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux
CPU: Intel(R) Core(TM) i5-3210M CPU @ 2.50GHz
MEM: 4G DDR3 1600MHz
```

View File

@ -1,638 +0,0 @@
package messenger
import (
"bufio"
"bytes"
"errors"
"io"
"net"
"net/http"
"net/textproto"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
log "github.com/golang/glog"
)
const (
// writeFlushPeriod is the amount of time we're willing to wait for a single
// response buffer to be fully written to the underlying TCP connection; after
// this amount of time the remaining bytes of the response are discarded. see
// responseWriter().
writeFlushPeriod = 30 * time.Second
)
type decoderID int32
func (did decoderID) String() string {
return "[" + strconv.Itoa(int(did)) + "]"
}
func (did *decoderID) next() decoderID {
return decoderID(atomic.AddInt32((*int32)(did), 1))
}
var (
errHijackFailed = errors.New("failed to hijack http connection")
did decoderID // decoder ID counter
)
type Decoder interface {
Requests() <-chan *Request
Err() <-chan error
Cancel(bool)
}
type Request struct {
*http.Request
response chan<- Response // callers that are finished with a Request should ensure that response is *always* closed, regardless of whether a Response has been written.
}
type Response struct {
code int
reason string
}
type httpDecoder struct {
req *http.Request // original request
kalive bool // keepalive
chunked bool // chunked
msg chan *Request
con net.Conn
rw *bufio.ReadWriter
errCh chan error
buf *bytes.Buffer
lrc *io.LimitedReader
shouldQuit chan struct{} // signal chan, closes upon calls to Cancel(...)
forceQuit chan struct{} // signal chan, indicates that quit is NOT graceful; closes upon Cancel(false)
cancelGuard sync.Mutex
readTimeout time.Duration
writeTimeout time.Duration
idtag string // useful for debugging
sendError func(err error) // abstraction for error handling
outCh chan *bytes.Buffer // chan of responses to be written to the connection
}
// DecodeHTTP hijacks an HTTP server connection and generates mesos libprocess HTTP
// requests via the returned chan. Upon generation of an error in the error chan the
// decoder's internal goroutine will terminate. This func returns immediately.
// The caller should immediately *stop* using the ResponseWriter and Request that were
// passed as parameters; the decoder assumes full control of the HTTP transport.
func DecodeHTTP(w http.ResponseWriter, r *http.Request) Decoder {
id := did.next()
d := &httpDecoder{
msg: make(chan *Request),
errCh: make(chan error, 1),
req: r,
shouldQuit: make(chan struct{}),
forceQuit: make(chan struct{}),
readTimeout: ReadTimeout,
writeTimeout: WriteTimeout,
idtag: id.String(),
outCh: make(chan *bytes.Buffer),
}
d.sendError = d.defaultSendError
go d.run(w)
return d
}
func (d *httpDecoder) Requests() <-chan *Request {
return d.msg
}
func (d *httpDecoder) Err() <-chan error {
return d.errCh
}
// Cancel the decoding process; if graceful then process pending responses before terminating
func (d *httpDecoder) Cancel(graceful bool) {
log.V(2).Infof("%scancel:%t", d.idtag, graceful)
d.cancelGuard.Lock()
defer d.cancelGuard.Unlock()
select {
case <-d.shouldQuit:
// already quitting, but perhaps gracefully?
default:
close(d.shouldQuit)
}
// allow caller to "upgrade" from a graceful cancel to a forced one
if !graceful {
select {
case <-d.forceQuit:
// already forcefully quitting
default:
close(d.forceQuit) // push it!
}
}
}
func (d *httpDecoder) run(res http.ResponseWriter) {
defer func() {
close(d.outCh) // we're finished generating response objects
log.V(2).Infoln(d.idtag + "run: terminating")
}()
for state := d.bootstrapState(res); state != nil; {
next := state(d)
state = next
}
}
// tryFlushResponse flushes the response buffer (if not empty); returns true if flush succeeded
func (d *httpDecoder) tryFlushResponse(out *bytes.Buffer) {
log.V(2).Infof(d.idtag+"try-flush-responses: %d bytes to flush", out.Len())
// set a write deadline here so that we don't block for very long.
err := d.setWriteTimeout()
if err != nil {
// this is a problem because if we can't set the timeout then we can't guarantee
// how long a write op might block for. Log the error and skip this response.
log.Errorln("failed to set write deadline, aborting response:", err.Error())
} else {
_, err = out.WriteTo(d.rw.Writer)
if err != nil {
if neterr, ok := err.(net.Error); ok && neterr.Timeout() && out.Len() > 0 {
// we couldn't fully write before timing out, return rch and hope that
// we have better luck next time.
return
}
// we don't really know how to deal with other kinds of errors, so
// log it and skip the rest of the response.
log.Errorln("failed to write response buffer:", err.Error())
}
err = d.rw.Flush()
if err != nil {
if neterr, ok := err.(net.Error); ok && neterr.Timeout() && out.Len() > 0 {
return
}
log.Errorln("failed to flush response buffer:", err.Error())
}
}
}
// TODO(jdef) make this a func on Response, to write its contents to a *bytes.Buffer
func (d *httpDecoder) buildResponseEntity(resp *Response) *bytes.Buffer {
log.V(2).Infoln(d.idtag + "build-response-entity")
out := &bytes.Buffer{}
// generate new response buffer content and continue; buffer should have
// at least a response status-line w/ Content-Length: 0
out.WriteString("HTTP/1.1 ")
out.WriteString(strconv.Itoa(resp.code))
out.WriteString(" ")
out.WriteString(resp.reason)
out.WriteString(crlf + "Content-Length: 0" + crlf)
select {
case <-d.shouldQuit:
// this is the last request in the pipeline and we've been told to quit, so
// indicate that the server will close the connection.
out.WriteString("Connection: Close" + crlf)
default:
}
out.WriteString(crlf) // this ends the HTTP response entity
return out
}
// updateForRequest updates the chunked and kalive fields of the decoder to align
// with the header values of the request
func (d *httpDecoder) updateForRequest(bootstrapping bool) {
// check "Transfer-Encoding" for "chunked"
d.chunked = false
for _, v := range d.req.Header["Transfer-Encoding"] {
if v == "chunked" {
d.chunked = true
break
}
}
if !d.chunked && d.req.ContentLength < 0 {
if bootstrapping {
// strongly suspect that Go's internal net/http lib is stripping
// the Transfer-Encoding header from the initial request, so this
// workaround makes a very mesos-specific assumption: an unknown
// Content-Length indicates a chunked stream.
d.chunked = true
} else {
// via https://tools.ietf.org/html/rfc7230#section-3.3.2
d.req.ContentLength = 0
}
}
// check "Connection" for "Keep-Alive"
d.kalive = d.req.Header.Get("Connection") == "Keep-Alive"
log.V(2).Infof(d.idtag+"update-for-request: chunked %v keep-alive %v", d.chunked, d.kalive)
}
func (d *httpDecoder) readBodyContent() httpState {
log.V(2).Info(d.idtag + "read-body-content")
if d.chunked {
d.buf = &bytes.Buffer{}
return readChunkHeaderState
} else {
d.lrc = limit(d.rw.Reader, d.req.ContentLength)
d.buf = &bytes.Buffer{}
return readBodyState
}
}
const http202response = "HTTP/1.1 202 OK\r\nContent-Length: 0\r\n\r\n"
func (d *httpDecoder) generateRequest() httpState {
log.V(2).Infof(d.idtag + "generate-request")
// send a Request to msg
b := d.buf.Bytes()
rch := make(chan Response, 1)
r := &Request{
Request: &http.Request{
Method: d.req.Method,
URL: d.req.URL,
Proto: d.req.Proto,
ProtoMajor: d.req.ProtoMajor,
ProtoMinor: d.req.ProtoMinor,
Header: d.req.Header,
Close: !d.kalive,
Host: d.req.Host,
RequestURI: d.req.RequestURI,
Body: &body{bytes.NewBuffer(b)},
ContentLength: int64(len(b)),
},
response: rch,
}
select {
case d.msg <- r:
case <-d.forceQuit:
return terminateState
}
select {
case <-d.forceQuit:
return terminateState
case resp, ok := <-rch:
if ok {
// response required, so build it and ship it
out := d.buildResponseEntity(&resp)
select {
case <-d.forceQuit:
return terminateState
case d.outCh <- out:
}
}
}
if d.kalive {
d.req = &http.Request{
ContentLength: -1,
Header: make(http.Header),
}
return awaitRequestState
} else {
return gracefulTerminateState
}
}
func (d *httpDecoder) defaultSendError(err error) {
d.errCh <- err
}
type httpState func(d *httpDecoder) httpState
// terminateState forcefully shuts down the state machine
func terminateState(d *httpDecoder) httpState {
log.V(2).Infoln(d.idtag + "terminate-state")
// closing these chans tells Decoder users that it's wrapping up
close(d.msg)
close(d.errCh)
// attempt to forcefully close the connection and signal response handlers that
// no further responses should be written
d.Cancel(false)
if d.con != nil {
d.con.Close()
}
// there is no spoon
return nil
}
func gracefulTerminateState(d *httpDecoder) httpState {
log.V(2).Infoln(d.idtag + "gracefully-terminate-state")
// closing these chans tells Decoder users that it's wrapping up
close(d.msg)
close(d.errCh)
// gracefully terminate the connection; signal that we should flush pending
// responses before closing the connection.
d.Cancel(true)
return nil
}
func limit(r *bufio.Reader, limit int64) *io.LimitedReader {
return &io.LimitedReader{
R: r,
N: limit,
}
}
// bootstrapState expects to be called when the standard net/http lib has already
// read the initial request query line + headers from a connection. the request
// is ready to be hijacked at this point.
func (d *httpDecoder) bootstrapState(res http.ResponseWriter) httpState {
log.V(2).Infoln(d.idtag + "bootstrap-state")
d.updateForRequest(true)
// hijack
hj, ok := res.(http.Hijacker)
if !ok {
http.Error(res, "server does not support hijack", http.StatusInternalServerError)
d.sendError(errHijackFailed)
return terminateState
}
c, rw, err := hj.Hijack()
if err != nil {
http.Error(res, "failed to hijack the connection", http.StatusInternalServerError)
d.sendError(errHijackFailed)
return terminateState
}
d.rw = rw
d.con = c
go d.responseWriter()
return d.readBodyContent()
}
func (d *httpDecoder) responseWriter() {
defer func() {
log.V(3).Infoln(d.idtag + "response-writer: closing connection")
d.con.Close()
}()
for buf := range d.outCh {
//TODO(jdef) I worry about this busy-looping
// write & flush the buffer until there's nothing left in it, or else
// we exceed the write/flush period.
now := time.Now()
for buf.Len() > 0 && time.Since(now) < writeFlushPeriod {
select {
case <-d.forceQuit:
return
default:
}
d.tryFlushResponse(buf)
}
if buf.Len() > 0 {
//TODO(jdef) should we abort the entire connection instead? a partially written
// response doesn't do anyone any good. That said, real libprocess agents don't
// really care about the response channel anyway - the entire system is fire and
// forget. So I've decided to err on the side that we might lose response bytes
// in favor of completely reading the connection request stream before we terminate.
log.Errorln(d.idtag + "failed to fully flush output buffer within write-flush period")
}
}
}
type body struct {
*bytes.Buffer
}
func (b *body) Close() error { return nil }
// checkTimeoutOrFail tests whether the given error is related to a timeout condition.
// returns true if the caller should advance to the returned state.
func (d *httpDecoder) checkTimeoutOrFail(err error, stateContinue httpState) (httpState, bool) {
if err != nil {
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
select {
case <-d.forceQuit:
return terminateState, true
case <-d.shouldQuit:
return gracefulTerminateState, true
default:
return stateContinue, true
}
}
d.sendError(err)
return terminateState, true
}
return nil, false
}
func (d *httpDecoder) setReadTimeoutOrFail() bool {
if d.readTimeout > 0 {
err := d.con.SetReadDeadline(time.Now().Add(d.readTimeout))
if err != nil {
d.sendError(err)
return false
}
}
return true
}
func (d *httpDecoder) setWriteTimeout() error {
if d.writeTimeout > 0 {
return d.con.SetWriteDeadline(time.Now().Add(d.writeTimeout))
}
return nil
}
func readChunkHeaderState(d *httpDecoder) httpState {
log.V(2).Infoln(d.idtag + "read-chunk-header-state")
tr := textproto.NewReader(d.rw.Reader)
if !d.setReadTimeoutOrFail() {
return terminateState
}
hexlen, err := tr.ReadLine()
if next, ok := d.checkTimeoutOrFail(err, readChunkHeaderState); ok {
return next
}
clen, err := strconv.ParseInt(hexlen, 16, 64)
if err != nil {
d.sendError(err)
return terminateState
}
if clen == 0 {
return readEndOfChunkStreamState
}
d.lrc = limit(d.rw.Reader, clen)
return readChunkState
}
func readChunkState(d *httpDecoder) httpState {
log.V(2).Infoln(d.idtag+"read-chunk-state, bytes remaining:", d.lrc.N)
if !d.setReadTimeoutOrFail() {
return terminateState
}
_, err := d.buf.ReadFrom(d.lrc)
if next, ok := d.checkTimeoutOrFail(err, readChunkState); ok {
return next
}
return readEndOfChunkState
}
const crlf = "\r\n"
func readEndOfChunkState(d *httpDecoder) httpState {
log.V(2).Infoln(d.idtag + "read-end-of-chunk-state")
if !d.setReadTimeoutOrFail() {
return terminateState
}
b, err := d.rw.Reader.Peek(2)
if len(b) == 2 {
if string(b) == crlf {
d.rw.ReadByte()
d.rw.ReadByte()
return readChunkHeaderState
}
d.sendError(errors.New(d.idtag + "unexpected data at end-of-chunk marker"))
return terminateState
}
// less than two bytes avail
if next, ok := d.checkTimeoutOrFail(err, readEndOfChunkState); ok {
return next
}
panic("couldn't peek 2 bytes, but didn't get an error?!")
}
func readEndOfChunkStreamState(d *httpDecoder) httpState {
log.V(2).Infoln(d.idtag + "read-end-of-chunk-stream-state")
if !d.setReadTimeoutOrFail() {
return terminateState
}
b, err := d.rw.Reader.Peek(2)
if len(b) == 2 {
if string(b) == crlf {
d.rw.ReadByte()
d.rw.ReadByte()
return d.generateRequest()
}
d.sendError(errors.New(d.idtag + "unexpected data at end-of-chunk marker"))
return terminateState
}
// less than 2 bytes avail
if next, ok := d.checkTimeoutOrFail(err, readEndOfChunkStreamState); ok {
return next
}
panic("couldn't peek 2 bytes, but didn't get an error?!")
}
func readBodyState(d *httpDecoder) httpState {
log.V(2).Infof(d.idtag+"read-body-state: %d bytes remaining", d.lrc.N)
// read remaining bytes into the buffer
var err error
if d.lrc.N > 0 {
if !d.setReadTimeoutOrFail() {
return terminateState
}
_, err = d.buf.ReadFrom(d.lrc)
}
if d.lrc.N <= 0 {
return d.generateRequest()
}
if next, ok := d.checkTimeoutOrFail(err, readBodyState); ok {
return next
}
return readBodyState
}
func isGracefulTermSignal(err error) bool {
if err == io.EOF {
return true
}
if operr, ok := err.(*net.OpError); ok {
return operr.Op == "read" && err == syscall.ECONNRESET
}
return false
}
func awaitRequestState(d *httpDecoder) httpState {
log.V(2).Infoln(d.idtag + "await-request-state")
tr := textproto.NewReader(d.rw.Reader)
if !d.setReadTimeoutOrFail() {
return terminateState
}
requestLine, err := tr.ReadLine()
if requestLine == "" && isGracefulTermSignal(err) {
// we're actually expecting this at some point, so don't react poorly
return gracefulTerminateState
}
if next, ok := d.checkTimeoutOrFail(err, awaitRequestState); ok {
return next
}
ss := strings.SplitN(requestLine, " ", 3)
if len(ss) < 3 {
if err == io.EOF {
return gracefulTerminateState
}
d.sendError(errors.New(d.idtag + "illegal request line"))
return terminateState
}
r := d.req
r.Method = ss[0]
r.RequestURI = ss[1]
r.URL, err = url.ParseRequestURI(ss[1])
if err != nil {
d.sendError(err)
return terminateState
}
major, minor, ok := http.ParseHTTPVersion(ss[2])
if !ok {
d.sendError(errors.New(d.idtag + "malformed HTTP version"))
return terminateState
}
r.ProtoMajor = major
r.ProtoMinor = minor
r.Proto = ss[2]
return readHeaderState
}
func readHeaderState(d *httpDecoder) httpState {
log.V(2).Infoln(d.idtag + "read-header-state")
if !d.setReadTimeoutOrFail() {
return terminateState
}
r := d.req
tr := textproto.NewReader(d.rw.Reader)
h, err := tr.ReadMIMEHeader()
// merge any headers that were read successfully (before a possible error)
for k, v := range h {
if rh, exists := r.Header[k]; exists {
r.Header[k] = append(rh, v...)
} else {
r.Header[k] = v
}
log.V(2).Infoln(d.idtag+"request header", k, v)
}
if next, ok := d.checkTimeoutOrFail(err, readHeaderState); ok {
return next
}
// special headers: Host, Content-Length, Transfer-Encoding
r.Host = r.Header.Get("Host")
r.TransferEncoding = r.Header["Transfer-Encoding"]
if cl := r.Header.Get("Content-Length"); cl != "" {
l, err := strconv.ParseInt(cl, 10, 64)
if err != nil {
d.sendError(err)
return terminateState
}
if l > -1 {
r.ContentLength = l
log.V(2).Infoln(d.idtag+"set content length", r.ContentLength)
}
}
d.updateForRequest(false)
return d.readBodyContent()
}

View File

@ -1,7 +0,0 @@
/*
Package messenger includes a messenger and a transporter.
The messenger provides interfaces to send a protobuf message
through the underlying transporter. It also dispatches messages
to installed handlers.
*/
package messenger

View File

@ -1,598 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package messenger
import (
"bytes"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
const (
DefaultReadTimeout = 10 * time.Second
DefaultWriteTimeout = 10 * time.Second
)
var (
ReadTimeout = DefaultReadTimeout
WriteTimeout = DefaultWriteTimeout
discardOnStopError = fmt.Errorf("discarding message because transport is shutting down")
errNotStarted = errors.New("HTTP transport has not been started")
errTerminal = errors.New("HTTP transport is terminated")
errAlreadyRunning = errors.New("HTTP transport is already running")
httpTransport = http.Transport{
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: DefaultReadTimeout,
}
// HttpClient is used for sending messages to remote processes
HttpClient = http.Client{
Timeout: DefaultReadTimeout,
}
)
// httpTransporter is a subset of the Transporter interface
type httpTransporter interface {
Send(ctx context.Context, msg *Message) error
Recv() (*Message, error)
Install(messageName string)
Start() (upid.UPID, <-chan error)
Stop(graceful bool) error
}
type notStartedState struct {
h *HTTPTransporter
}
type stoppedState struct{}
type runningState struct {
*notStartedState
}
/* -- not-started state */
func (s *notStartedState) Send(ctx context.Context, msg *Message) error { return errNotStarted }
func (s *notStartedState) Recv() (*Message, error) { return nil, errNotStarted }
func (s *notStartedState) Stop(graceful bool) error { return errNotStarted }
func (s *notStartedState) Install(messageName string) { s.h.install(messageName) }
func (s *notStartedState) Start() (upid.UPID, <-chan error) {
s.h.state = &runningState{s}
return s.h.start()
}
/* -- stopped state */
func (s *stoppedState) Send(ctx context.Context, msg *Message) error { return errTerminal }
func (s *stoppedState) Recv() (*Message, error) { return nil, errTerminal }
func (s *stoppedState) Stop(graceful bool) error { return errTerminal }
func (s *stoppedState) Install(messageName string) {}
func (s *stoppedState) Start() (upid.UPID, <-chan error) {
ch := make(chan error, 1)
ch <- errTerminal
return upid.UPID{}, ch
}
/* -- running state */
func (s *runningState) Send(ctx context.Context, msg *Message) error { return s.h.send(ctx, msg) }
func (s *runningState) Recv() (*Message, error) { return s.h.recv() }
func (s *runningState) Stop(graceful bool) error {
s.h.state = &stoppedState{}
return s.h.stop(graceful)
}
func (s *runningState) Start() (upid.UPID, <-chan error) {
ch := make(chan error, 1)
ch <- errAlreadyRunning
return upid.UPID{}, ch
}
// httpOpt is a functional option type
type httpOpt func(*HTTPTransporter)
// HTTPTransporter implements the interfaces of the Transporter.
type HTTPTransporter struct {
// If the host is empty("") then it will listen on localhost.
// If the port is empty("") then it will listen on random port.
upid upid.UPID
listener net.Listener // TODO(yifan): Change to TCPListener.
mux *http.ServeMux
tr *http.Transport
client *http.Client
messageQueue chan *Message
address net.IP // optional binding address
shouldQuit chan struct{}
stateLock sync.RWMutex // protect lifecycle (start/stop) funcs
state httpTransporter
server *http.Server
}
// NewHTTPTransporter creates a new http transporter with an optional binding address.
func NewHTTPTransporter(upid upid.UPID, address net.IP, opts ...httpOpt) *HTTPTransporter {
transport := httpTransport
client := HttpClient
client.Transport = &transport
mux := http.NewServeMux()
result := &HTTPTransporter{
upid: upid,
messageQueue: make(chan *Message, defaultQueueSize),
mux: mux,
client: &client,
tr: &transport,
address: address,
shouldQuit: make(chan struct{}),
server: &http.Server{
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
Handler: mux,
},
}
for _, f := range opts {
f(result)
}
result.state = &notStartedState{result}
return result
}
func ServerTLSConfig(config *tls.Config, nextProto map[string]func(*http.Server, *tls.Conn, http.Handler)) httpOpt {
return func(transport *HTTPTransporter) {
transport.server.TLSConfig = config
transport.server.TLSNextProto = nextProto
}
}
func ClientTLSConfig(config *tls.Config, handshakeTimeout time.Duration) httpOpt {
return func(transport *HTTPTransporter) {
transport.tr.TLSClientConfig = config
transport.tr.TLSHandshakeTimeout = handshakeTimeout
}
}
func (t *HTTPTransporter) getState() httpTransporter {
t.stateLock.RLock()
defer t.stateLock.RUnlock()
return t.state
}
// Send sends the message to its specified upid.
func (t *HTTPTransporter) Send(ctx context.Context, msg *Message) (sendError error) {
return t.getState().Send(ctx, msg)
}
type mesosError struct {
errorCode int
upid string
uri string
status string
}
func (e *mesosError) Error() string {
return fmt.Sprintf("master %s rejected %s, returned status %q",
e.upid, e.uri, e.status)
}
type networkError struct {
cause error
}
func (e *networkError) Error() string {
return e.cause.Error()
}
// send delivers a message to a mesos component via HTTP, returns a mesosError if the
// communication with the remote process was successful but rejected. A networkError
// error indicates that communication with the remote process failed at the network layer.
func (t *HTTPTransporter) send(ctx context.Context, msg *Message) (sendError error) {
log.V(2).Infof("Sending message to %v via http\n", msg.UPID)
req, err := t.makeLibprocessRequest(msg)
if err != nil {
return err
}
return t.httpDo(ctx, req, func(resp *http.Response, err error) error {
if err != nil {
log.V(1).Infof("Failed to POST: %v\n", err)
return &networkError{err}
}
defer resp.Body.Close()
// ensure master acknowledgement.
if (resp.StatusCode != http.StatusOK) && (resp.StatusCode != http.StatusAccepted) {
return &mesosError{
errorCode: resp.StatusCode,
upid: msg.UPID.String(),
uri: msg.RequestURI(),
status: resp.Status,
}
}
return nil
})
}
func (t *HTTPTransporter) httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.shouldQuit:
return discardOnStopError
default: // continue
}
c := make(chan error, 1)
go func() { c <- f(t.client.Do(req)) }()
select {
case <-ctx.Done():
t.tr.CancelRequest(req)
<-c // Wait for f to return.
return ctx.Err()
case err := <-c:
return err
case <-t.shouldQuit:
t.tr.CancelRequest(req)
<-c // Wait for f to return.
return discardOnStopError
}
}
// Recv returns the message, one at a time.
func (t *HTTPTransporter) Recv() (*Message, error) {
return t.getState().Recv()
}
func (t *HTTPTransporter) recv() (*Message, error) {
select {
default:
select {
case msg := <-t.messageQueue:
return msg, nil
case <-t.shouldQuit:
}
case <-t.shouldQuit:
}
return nil, discardOnStopError
}
// Install the request URI according to the message's name.
func (t *HTTPTransporter) Install(msgName string) {
t.getState().Install(msgName)
}
func (t *HTTPTransporter) install(msgName string) {
requestURI := fmt.Sprintf("/%s/%s", t.upid.ID, msgName)
t.mux.HandleFunc(requestURI, t.messageDecoder)
}
type loggedListener struct {
delegate net.Listener
done <-chan struct{}
}
func (l *loggedListener) Accept() (c net.Conn, err error) {
c, err = l.delegate.Accept()
if c != nil {
log.Infoln("accepted connection from", c.RemoteAddr())
c = logConnection(c)
} else if err != nil {
select {
case <-l.done:
default:
log.Errorln("failed to accept connection:", err.Error())
}
}
return
}
func (l *loggedListener) Close() (err error) {
err = l.delegate.Close()
if err != nil {
select {
case <-l.done:
default:
log.Errorln("error closing listener:", err.Error())
}
} else {
log.Infoln("closed listener")
}
return
}
func (l *loggedListener) Addr() net.Addr { return l.delegate.Addr() }
func logConnection(c net.Conn) net.Conn {
w := hex.Dumper(os.Stdout)
r := io.TeeReader(c, w)
return &loggedConnection{
Conn: c,
reader: r,
}
}
type loggedConnection struct {
net.Conn
reader io.Reader
}
func (c *loggedConnection) Read(b []byte) (int, error) {
return c.reader.Read(b)
}
// Listen starts listen on UPID. If UPID is empty, the transporter
// will listen on a random port, and then fill the UPID with the
// host:port it is listening.
func (t *HTTPTransporter) listen() error {
var host string
if t.address != nil {
host = t.address.String()
} else {
host = t.upid.Host
}
var port string
if t.upid.Port != "" {
port = t.upid.Port
} else {
port = "0"
}
// NOTE: Explicitly specifies IPv4 because Libprocess
// only supports IPv4 for now.
ln, err := net.Listen("tcp4", net.JoinHostPort(host, port))
if err != nil {
log.Errorf("HTTPTransporter failed to listen: %v\n", err)
return err
}
// Save the host:port in case they are not specified in upid.
host, port, _ = net.SplitHostPort(ln.Addr().String())
log.Infoln("listening on", host, "port", port)
if len(t.upid.Host) == 0 {
t.upid.Host = host
}
if len(t.upid.Port) == 0 || t.upid.Port == "0" {
t.upid.Port = port
}
if log.V(3) {
t.listener = &loggedListener{delegate: ln, done: t.shouldQuit}
} else {
t.listener = ln
}
return nil
}
// Start starts the http transporter
func (t *HTTPTransporter) Start() (upid.UPID, <-chan error) {
t.stateLock.Lock()
defer t.stateLock.Unlock()
return t.state.Start()
}
// start expects to be guarded by stateLock
func (t *HTTPTransporter) start() (upid.UPID, <-chan error) {
ch := make(chan error, 1)
if err := t.listen(); err != nil {
ch <- err
return upid.UPID{}, ch
}
// TODO(yifan): Set read/write deadline.
go func() {
err := t.server.Serve(t.listener)
select {
case <-t.shouldQuit:
log.V(1).Infof("HTTP server stopped because of shutdown")
ch <- nil
default:
if err != nil && log.V(1) {
log.Errorln("HTTP server stopped with error", err.Error())
} else {
log.V(1).Infof("HTTP server stopped")
}
ch <- err
t.Stop(false)
}
}()
return t.upid, ch
}
// Stop stops the http transporter by closing the listener.
func (t *HTTPTransporter) Stop(graceful bool) error {
t.stateLock.Lock()
defer t.stateLock.Unlock()
return t.state.Stop(graceful)
}
// stop expects to be guarded by stateLock
func (t *HTTPTransporter) stop(graceful bool) error {
close(t.shouldQuit)
log.Info("stopping HTTP transport")
//TODO(jdef) if graceful, wait for pending requests to terminate
err := t.listener.Close()
return err
}
// UPID returns the upid of the transporter.
func (t *HTTPTransporter) UPID() upid.UPID {
t.stateLock.Lock()
defer t.stateLock.Unlock()
return t.upid
}
func (t *HTTPTransporter) messageDecoder(w http.ResponseWriter, r *http.Request) {
// Verify it's a libprocess request.
from, err := getLibprocessFrom(r)
if err != nil {
log.Errorf("Ignoring the request, because it's not a libprocess request: %v\n", err)
w.WriteHeader(http.StatusBadRequest)
return
}
decoder := DecodeHTTP(w, r)
defer decoder.Cancel(true)
t.processRequests(from, decoder.Requests())
// log an error if there's one waiting, otherwise move on
select {
case err, ok := <-decoder.Err():
if ok {
log.Errorf("failed to decode HTTP message: %v", err)
}
default:
}
}
func (t *HTTPTransporter) processRequests(from *upid.UPID, incoming <-chan *Request) {
for {
select {
case r, ok := <-incoming:
if !ok || !t.processOneRequest(from, r) {
return
}
case <-t.shouldQuit:
return
}
}
}
func (t *HTTPTransporter) processOneRequest(from *upid.UPID, request *Request) (keepGoing bool) {
// regardless of whether we write a Response we must close this chan
defer close(request.response)
keepGoing = true
//TODO(jdef) this is probably inefficient given the current implementation of the
// decoder: no need to make another copy of data that's already competely buffered
data, err := ioutil.ReadAll(request.Body)
if err != nil {
// this is unlikely given the current implementation of the decoder:
// the body has been completely buffered in memory already
log.Errorf("failed to read HTTP body: %v", err)
return
}
log.V(2).Infof("Receiving %q %v from %v, length %v", request.Method, request.URL, from, len(data))
m := &Message{
UPID: from,
Name: extractNameFromRequestURI(request.RequestURI),
Bytes: data,
}
// deterministic behavior and output..
select {
case <-t.shouldQuit:
keepGoing = false
select {
case t.messageQueue <- m:
default:
}
case t.messageQueue <- m:
select {
case <-t.shouldQuit:
keepGoing = false
default:
}
}
// Only send back an HTTP response if this isn't from libprocess
// (which we determine by looking at the User-Agent). This is
// necessary because older versions of libprocess would try and
// recv the data and parse it as an HTTP request which would
// fail thus causing the socket to get closed (but now
// libprocess will ignore responses, see ignore_data).
// see https://github.com/apache/mesos/blob/adecbfa6a216815bd7dc7d26e721c4c87e465c30/3rdparty/libprocess/src/process.cpp#L2192
if _, ok := parseLibprocessAgent(request.Header); !ok {
log.V(2).Infof("not libprocess agent, sending a 202")
request.response <- Response{
code: 202,
reason: "Accepted",
} // should never block
}
return
}
func (t *HTTPTransporter) makeLibprocessRequest(msg *Message) (*http.Request, error) {
if msg.UPID == nil {
panic(fmt.Sprintf("message is missing UPID: %+v", msg))
}
hostport := net.JoinHostPort(msg.UPID.Host, msg.UPID.Port)
targetURL := fmt.Sprintf("http://%s%s", hostport, msg.RequestURI())
log.V(2).Infof("libproc target URL %s", targetURL)
req, err := http.NewRequest("POST", targetURL, bytes.NewReader(msg.Bytes))
if err != nil {
log.V(1).Infof("Failed to create request: %v\n", err)
return nil, err
}
if !msg.isV1API() {
req.Header.Add("Libprocess-From", t.upid.String())
req.Header.Add("Connection", "Keep-Alive")
}
req.Header.Add("Content-Type", "application/x-protobuf")
return req, nil
}
func getLibprocessFrom(r *http.Request) (*upid.UPID, error) {
if r.Method != "POST" {
return nil, fmt.Errorf("Not a POST request")
}
if agent, ok := parseLibprocessAgent(r.Header); ok {
return upid.Parse(agent)
}
lf, ok := r.Header["Libprocess-From"]
if ok {
// TODO(yifan): Just take the first field for now.
return upid.Parse(lf[0])
}
return nil, fmt.Errorf("Cannot find 'User-Agent' or 'Libprocess-From'")
}
func parseLibprocessAgent(h http.Header) (string, bool) {
const prefix = "libprocess/"
if ua, ok := h["User-Agent"]; ok {
for _, agent := range ua {
if strings.HasPrefix(agent, prefix) {
return agent[len(prefix):], true
}
}
}
return "", false
}

View File

@ -1,53 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package messenger
import (
"fmt"
"strings"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/upid"
)
// Message defines the type that passes in the Messenger.
type Message struct {
UPID *upid.UPID
Name string
ProtoMessage proto.Message
Bytes []byte
}
// RequestURI returns the request URI of the message.
func (m *Message) RequestURI() string {
if m.isV1API() {
return fmt.Sprintf("/api/v1/%s", m.Name)
}
return fmt.Sprintf("/%s/%s", m.UPID.ID, m.Name)
}
func (m *Message) isV1API() bool {
return !strings.HasPrefix(m.Name, "mesos.internal")
}
// NOTE: This should not fail or panic.
func extractNameFromRequestURI(requestURI string) string {
return strings.Split(requestURI, "/")[2]
}

View File

@ -1,417 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package messenger
import (
"fmt"
"net"
"reflect"
"strconv"
"sync"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosproto/scheduler"
"github.com/mesos/mesos-go/mesosutil/process"
"github.com/mesos/mesos-go/messenger/sessionid"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
const (
defaultQueueSize = 1024
)
// MessageHandler is the callback of the message. When the callback
// is invoked, the sender's upid and the message is passed to the callback.
type MessageHandler func(from *upid.UPID, pbMsg proto.Message)
// Messenger defines the interfaces that should be implemented.
type Messenger interface {
Install(handler MessageHandler, msg proto.Message) error
Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error
Route(ctx context.Context, from *upid.UPID, msg proto.Message) error
Start() error
Stop() error
UPID() upid.UPID
}
type errorHandlerFunc func(context.Context, *Message, error) error
type dispatchFunc func(errorHandlerFunc)
// MesosMessenger is an implementation of the Messenger interface.
type MesosMessenger struct {
upid upid.UPID
sendingQueue chan dispatchFunc
installedMessages map[string]reflect.Type
installedHandlers map[string]MessageHandler
stop chan struct{}
stopOnce sync.Once
tr Transporter
guardHandlers sync.RWMutex // protect simultaneous changes to messages/handlers maps
}
// ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to
// determine the binding-address used for both the UPID.Host and Transport binding address.
func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16, publishedAddress net.IP) (Messenger, error) {
upid := upid.UPID{
ID: proc.Label(),
Port: strconv.Itoa(int(port)),
}
host, err := UPIDBindingAddress(hostname, bindingAddress)
if err != nil {
return nil, err
}
var publishedHost string
if publishedAddress != nil {
publishedHost, err = UPIDBindingAddress(hostname, publishedAddress)
if err != nil {
return nil, err
}
}
if publishedHost != "" {
upid.Host = publishedHost
} else {
upid.Host = host
}
return NewHttpWithBindingAddress(upid, bindingAddress), nil
}
// UPIDBindingAddress determines the value of UPID.Host that will be used to build
// a Transport. If a non-nil, non-wildcard bindingAddress is specified then it will be used
// for both the UPID and Transport binding address. Otherwise hostname is resolved to an IP
// address and the UPID.Host is set to that address and the bindingAddress is passed through
// to the Transport.
func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error) {
upidHost := ""
if bindingAddress != nil && "0.0.0.0" != bindingAddress.String() {
upidHost = bindingAddress.String()
} else {
if hostname == "" || hostname == "0.0.0.0" {
return "", fmt.Errorf("invalid hostname (%q) specified with binding address %v", hostname, bindingAddress)
}
ip := net.ParseIP(hostname)
if ip != nil {
ip = ip.To4()
}
if ip == nil {
ips, err := net.LookupIP(hostname)
if err != nil {
return "", err
}
// try to find an ipv4 and use that
for _, addr := range ips {
if ip = addr.To4(); ip != nil {
break
}
}
if ip == nil {
// no ipv4? best guess, just take the first addr
if len(ips) > 0 {
ip = ips[0]
log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip)
} else {
return "", fmt.Errorf("failed to determine IP address for host '%v'", hostname)
}
}
}
upidHost = ip.String()
}
return upidHost, nil
}
// NewMesosMessenger creates a new mesos messenger.
func NewHttp(upid upid.UPID, opts ...httpOpt) *MesosMessenger {
return NewHttpWithBindingAddress(upid, nil, opts...)
}
func NewHttpWithBindingAddress(upid upid.UPID, address net.IP, opts ...httpOpt) *MesosMessenger {
return New(NewHTTPTransporter(upid, address, opts...))
}
func New(t Transporter) *MesosMessenger {
return &MesosMessenger{
sendingQueue: make(chan dispatchFunc, defaultQueueSize),
installedMessages: make(map[string]reflect.Type),
installedHandlers: make(map[string]MessageHandler),
tr: t,
}
}
/// Install installs the handler with the given message.
func (m *MesosMessenger) Install(handler MessageHandler, msg proto.Message) error {
// Check if the message is a pointer.
mtype := reflect.TypeOf(msg)
if mtype.Kind() != reflect.Ptr {
return fmt.Errorf("Message %v is not a Ptr type", msg)
}
// Check if the message is already installed.
name := getMessageName(msg)
if _, ok := m.installedMessages[name]; ok {
return fmt.Errorf("Message %v is already installed", name)
}
m.guardHandlers.Lock()
defer m.guardHandlers.Unlock()
m.installedMessages[name] = mtype.Elem()
m.installedHandlers[name] = handler
m.tr.Install(name)
return nil
}
// Send puts a message into the outgoing queue, waiting to be sent.
// With buffered channels, this will not block under moderate throughput.
// When an error is generated, the error can be communicated by placing
// a message on the incoming queue to be handled upstream.
func (m *MesosMessenger) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
if upid == nil {
panic("cannot sent a message to a nil pid")
} else if *upid == m.upid {
return fmt.Errorf("Send the message to self")
}
b, err := proto.Marshal(msg)
if err != nil {
return err
}
name := getMessageName(msg)
log.V(2).Infof("Sending message %v to %v\n", name, upid)
wrapped := &Message{upid, name, msg, b}
d := dispatchFunc(func(rf errorHandlerFunc) {
err := m.tr.Send(ctx, wrapped)
err = rf(ctx, wrapped, err)
if err != nil {
m.reportError("send", wrapped, err)
}
})
select {
case <-ctx.Done():
return ctx.Err()
case m.sendingQueue <- d:
return nil
}
}
// Route puts a message either in the incoming or outgoing queue.
// This method is useful for:
// 1) routing internal error to callback handlers
// 2) testing components without starting remote servers.
func (m *MesosMessenger) Route(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
if upid == nil {
panic("cannot route a message to a nil pid")
} else if *upid != m.upid {
// if destination is not self, send to outbound.
return m.Send(ctx, upid, msg)
}
name := getMessageName(msg)
log.V(2).Infof("routing message %q to self", name)
_, handler, ok := m.messageBinding(name)
if !ok {
return fmt.Errorf("failed to route message, no message binding for %q", name)
}
// the implication of this is that messages can be delivered to self even if the
// messenger has been stopped. is that OK?
go handler(upid, msg)
return nil
}
// Start starts the messenger; expects to be called once and only once.
func (m *MesosMessenger) Start() error {
m.stop = make(chan struct{})
pid, errChan := m.tr.Start()
if pid == (upid.UPID{}) {
err := <-errChan
return fmt.Errorf("failed to start messenger: %v", err)
}
// the pid that we're actually bound as
m.upid = pid
go m.sendLoop()
go m.decodeLoop()
// wait for a listener error or a stop signal; either way stop the messenger
// TODO(jdef) a better implementation would attempt to re-listen; need to coordinate
// access to m.upid in that case. probably better off with a state machine instead of
// what we have now.
go func() {
select {
case err := <-errChan:
if err != nil {
//TODO(jdef) should the driver abort in this case? probably
//since this messenger will never attempt to re-establish the
//transport
log.Errorln("transport stopped unexpectedly:", err.Error())
}
err = m.Stop()
if err != nil && err != errTerminal {
log.Errorln("failed to stop messenger cleanly: ", err.Error())
}
case <-m.stop:
}
}()
return nil
}
// Stop stops the messenger and clean up all the goroutines.
func (m *MesosMessenger) Stop() (err error) {
m.stopOnce.Do(func() {
select {
case <-m.stop:
default:
defer close(m.stop)
}
log.Infof("stopping messenger %v..", m.upid)
//TODO(jdef) don't hardcode the graceful flag here
if err2 := m.tr.Stop(true); err2 != nil && err2 != errTerminal {
log.Warningf("failed to stop the transporter: %v\n", err2)
err = err2
}
})
return
}
// UPID returns the upid of the messenger.
func (m *MesosMessenger) UPID() upid.UPID {
return m.upid
}
func (m *MesosMessenger) reportError(action string, msg *Message, err error) {
// log message transmission errors but don't shoot the messenger.
// this approach essentially drops all undelivered messages on the floor.
name := ""
if msg != nil {
name = msg.Name
}
log.Errorf("failed to %s message %q: %+v", action, name, err)
}
func (m *MesosMessenger) sendLoop() {
for {
select {
case <-m.stop:
return
case f := <-m.sendingQueue:
f(errorHandlerFunc(func(ctx context.Context, msg *Message, err error) error {
if _, ok := err.(*networkError); ok {
// if transport reports a network error, then
// we're probably disconnected from the remote process?
pid := msg.UPID.String()
neterr := &mesos.InternalNetworkError{Pid: &pid}
sessionID, ok := sessionid.FromContext(ctx)
if ok {
neterr.Session = &sessionID
}
log.V(1).Infof("routing network error for pid %q session %q", pid, sessionID)
err2 := m.Route(ctx, &m.upid, neterr)
if err2 != nil {
log.Error(err2)
} else {
log.V(1).Infof("swallowing raw error because we're reporting a networkError: %v", err)
return nil
}
}
return err
}))
}
}
}
// Since HTTPTransporter.Recv() is already buffered, so we don't need a 'recvLoop' here.
func (m *MesosMessenger) decodeLoop() {
for {
select {
case <-m.stop:
return
default:
}
msg, err := m.tr.Recv()
if err != nil {
if err == discardOnStopError {
log.V(1).Info("exiting decodeLoop, transport shutting down")
return
} else {
panic(fmt.Sprintf("unexpected transport error: %v", err))
}
}
log.V(2).Infof("Receiving message %v from %v\n", msg.Name, msg.UPID)
protoMessage, handler, found := m.messageBinding(msg.Name)
if !found {
log.Warningf("no message binding for message %q", msg.Name)
continue
}
msg.ProtoMessage = protoMessage
if err := proto.Unmarshal(msg.Bytes, msg.ProtoMessage); err != nil {
log.Errorf("Failed to unmarshal message %v: %v\n", msg, err)
continue
}
handler(msg.UPID, msg.ProtoMessage)
}
}
func (m *MesosMessenger) messageBinding(name string) (proto.Message, MessageHandler, bool) {
m.guardHandlers.RLock()
defer m.guardHandlers.RUnlock()
gotype, ok := m.installedMessages[name]
if !ok {
return nil, nil, false
}
handler, ok := m.installedHandlers[name]
if !ok {
return nil, nil, false
}
protoMessage := reflect.New(gotype).Interface().(proto.Message)
return protoMessage, handler, true
}
// getMessageName returns the name of the message in the mesos manner.
func getMessageName(msg proto.Message) string {
var msgName string
switch msg := msg.(type) {
case *scheduler.Call:
msgName = "scheduler"
default:
msgName = fmt.Sprintf("%v.%v", "mesos.internal", reflect.TypeOf(msg).Elem().Name())
}
return msgName
}

View File

@ -1,18 +0,0 @@
package sessionid
import (
"golang.org/x/net/context"
)
type key int
const sessionIDKey = 0
func NewContext(ctx context.Context, sessionID string) context.Context {
return context.WithValue(ctx, sessionIDKey, sessionID)
}
func FromContext(ctx context.Context) (string, bool) {
sessionID, ok := ctx.Value(sessionIDKey).(string)
return sessionID, ok
}

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package messenger
import (
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
// Transporter defines methods for communicating with remote processes.
type Transporter interface {
//Send sends message to remote process. Must use context to determine
//cancelled requests. Will stop sending when transport is stopped.
Send(ctx context.Context, msg *Message) error
//Rcvd receives and delegate message handling to installed handlers.
//Will stop receiving when transport is stopped.
Recv() (*Message, error)
//Install mount an handler based on incoming message name.
Install(messageName string)
//Start starts the transporter and returns immediately. The error chan
//is never nil.
Start() (upid.UPID, <-chan error)
//Stop kills the transporter.
Stop(graceful bool) error
//UPID returns the PID for transporter.
UPID() upid.UPID
}

View File

@ -1,6 +0,0 @@
/*
Package scheduler includes the interfaces for the mesos scheduler and
the mesos executor driver. It also contains as well as an implementation
of the driver that you can use in your code.
*/
package scheduler

View File

@ -1,29 +0,0 @@
package scheduler
import (
"github.com/mesos/mesos-go/auth/callback"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/upid"
)
type CredentialHandler struct {
pid *upid.UPID // the process to authenticate against (master)
client *upid.UPID // the process to be authenticated (slave / framework)
credential *mesos.Credential
}
func (h *CredentialHandler) Handle(callbacks ...callback.Interface) error {
for _, cb := range callbacks {
switch cb := cb.(type) {
case *callback.Name:
cb.Set(h.credential.GetPrincipal())
case *callback.Password:
cb.Set(([]byte)(h.credential.GetSecret()))
case *callback.Interprocess:
cb.Set(*(h.pid), *(h.client))
default:
return &callback.Unsupported{Callback: cb}
}
}
return nil
}

View File

@ -1,7 +0,0 @@
package scheduler
import (
_ "github.com/mesos/mesos-go/auth/sasl"
_ "github.com/mesos/mesos-go/auth/sasl/mech/crammd5"
_ "github.com/mesos/mesos-go/detector/zoo"
)

View File

@ -1,96 +0,0 @@
package scheduler
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/upid"
"sync"
)
type cachedOffer struct {
offer *mesos.Offer
slavePid *upid.UPID
}
func newCachedOffer(offer *mesos.Offer, slavePid *upid.UPID) *cachedOffer {
return &cachedOffer{offer: offer, slavePid: slavePid}
}
// schedCache a managed cache with backing maps to store offeres
// and tasked slaves.
type schedCache struct {
lock sync.RWMutex
savedOffers map[string]*cachedOffer // current offers key:OfferID
savedSlavePids map[string]*upid.UPID // Current saved slaves, key:slaveId
}
func newSchedCache() *schedCache {
return &schedCache{
savedOffers: make(map[string]*cachedOffer),
savedSlavePids: make(map[string]*upid.UPID),
}
}
// putOffer stores an offer and the slavePID associated with offer.
func (cache *schedCache) putOffer(offer *mesos.Offer, pid *upid.UPID) {
if offer == nil || pid == nil {
log.V(3).Infoln("WARN: Offer not cached. The offer or pid cannot be nil")
return
}
log.V(3).Infoln("Caching offer ", offer.Id.GetValue(), " with slavePID ", pid.String())
cache.lock.Lock()
cache.savedOffers[offer.Id.GetValue()] = &cachedOffer{offer: offer, slavePid: pid}
cache.lock.Unlock()
}
// getOffer returns cached offer
func (cache *schedCache) getOffer(offerId *mesos.OfferID) *cachedOffer {
if offerId == nil {
log.V(3).Infoln("WARN: OfferId == nil, returning nil")
return nil
}
cache.lock.RLock()
defer cache.lock.RUnlock()
return cache.savedOffers[offerId.GetValue()]
}
// containsOff test cache for offer(offerId)
func (cache *schedCache) containsOffer(offerId *mesos.OfferID) bool {
cache.lock.RLock()
defer cache.lock.RUnlock()
_, ok := cache.savedOffers[offerId.GetValue()]
return ok
}
func (cache *schedCache) removeOffer(offerId *mesos.OfferID) {
cache.lock.Lock()
delete(cache.savedOffers, offerId.GetValue())
cache.lock.Unlock()
}
func (cache *schedCache) putSlavePid(slaveId *mesos.SlaveID, pid *upid.UPID) {
cache.lock.Lock()
cache.savedSlavePids[slaveId.GetValue()] = pid
cache.lock.Unlock()
}
func (cache *schedCache) getSlavePid(slaveId *mesos.SlaveID) *upid.UPID {
if slaveId == nil {
log.V(3).Infoln("SlaveId == nil, returning empty UPID")
return nil
}
return cache.savedSlavePids[slaveId.GetValue()]
}
func (cache *schedCache) containsSlavePid(slaveId *mesos.SlaveID) bool {
cache.lock.RLock()
defer cache.lock.RUnlock()
_, ok := cache.savedSlavePids[slaveId.GetValue()]
return ok
}
func (cache *schedCache) removeSlavePid(slaveId *mesos.SlaveID) {
cache.lock.Lock()
delete(cache.savedSlavePids, slaveId.GetValue())
cache.lock.Unlock()
}

View File

@ -1,196 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package scheduler
import (
mesos "github.com/mesos/mesos-go/mesosproto"
)
// Interface for connecting a scheduler to Mesos. This
// interface is used both to manage the scheduler's lifecycle (start
// it, stop it, or wait for it to finish) and to interact with Mesos
// (e.g., launch tasks, kill tasks, etc.).
// See the MesosSchedulerDriver type for a concrete
// impl of a SchedulerDriver.
type SchedulerDriver interface {
// Starts the scheduler driver. This needs to be called before any
// other driver calls are made.
Start() (mesos.Status, error)
// Stops the scheduler driver. If the 'failover' flag is set to
// false then it is expected that this framework will never
// reconnect to Mesos and all of its executors and tasks can be
// terminated. Otherwise, all executors and tasks will remain
// running (for some framework specific failover timeout) allowing the
// scheduler to reconnect (possibly in the same process, or from a
// different process, for example, on a different machine).
Stop(failover bool) (mesos.Status, error)
// Aborts the driver so that no more callbacks can be made to the
// scheduler. The semantics of abort and stop have deliberately been
// separated so that code can detect an aborted driver (i.e., via
// the return status of SchedulerDriver::join, see below), and
// instantiate and start another driver if desired (from within the
// same process). Note that 'Stop()' is not automatically called
// inside 'Abort()'.
Abort() (mesos.Status, error)
// Waits for the driver to be stopped or aborted, possibly
// _blocking_ the current thread indefinitely. The return status of
// this function can be used to determine if the driver was aborted
// (see mesos.proto for a description of Status).
Join() (mesos.Status, error)
// Starts and immediately joins (i.e., blocks on) the driver.
Run() (mesos.Status, error)
// Requests resources from Mesos (see mesos.proto for a description
// of Request and how, for example, to request resources
// from specific slaves). Any resources available are offered to the
// framework via Scheduler.ResourceOffers callback, asynchronously.
RequestResources(requests []*mesos.Request) (mesos.Status, error)
// AcceptOffers utilizes the new HTTP API to send a Scheduler Call Message
// to the Mesos Master. Valid operation types are LAUNCH, RESERVE, UNRESERVE,
// CREATE, DESTROY, and more.
AcceptOffers(offerIDs []*mesos.OfferID, operations []*mesos.Offer_Operation, filters *mesos.Filters) (mesos.Status, error)
// Launches the given set of tasks. Any resources remaining (i.e.,
// not used by the tasks or their executors) will be considered
// declined. The specified filters are applied on all unused
// resources (see mesos.proto for a description of Filters).
// Available resources are aggregated when mutiple offers are
// provided. Note that all offers must belong to the same slave.
// Invoking this function with an empty collection of tasks declines
// offers in their entirety (see Scheduler::declineOffer).
LaunchTasks(offerIDs []*mesos.OfferID, tasks []*mesos.TaskInfo, filters *mesos.Filters) (mesos.Status, error)
// Kills the specified task. Note that attempting to kill a task is
// currently not reliable. If, for example, a scheduler fails over
// while it was attempting to kill a task it will need to retry in
// the future. Likewise, if unregistered / disconnected, the request
// will be dropped (these semantics may be changed in the future).
KillTask(taskID *mesos.TaskID) (mesos.Status, error)
// Declines an offer in its entirety and applies the specified
// filters on the resources (see mesos.proto for a description of
// Filters). Note that this can be done at any time, it is not
// necessary to do this within the Scheduler::resourceOffers
// callback.
DeclineOffer(offerID *mesos.OfferID, filters *mesos.Filters) (mesos.Status, error)
// Removes all filters previously set by the framework (via
// LaunchTasks()). This enables the framework to receive offers from
// those filtered slaves.
ReviveOffers() (mesos.Status, error)
// Sends a message from the framework to one of its executors. These
// messages are best effort; do not expect a framework message to be
// retransmitted in any reliable fashion.
SendFrameworkMessage(executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, data string) (mesos.Status, error)
// Allows the framework to query the status for non-terminal tasks.
// This causes the master to send back the latest task status for
// each task in 'statuses', if possible. Tasks that are no longer
// known will result in a TASK_LOST update. If statuses is empty,
// then the master will send the latest status for each task
// currently known.
ReconcileTasks(statuses []*mesos.TaskStatus) (mesos.Status, error)
}
// Scheduler a type with callback attributes to be provided by frameworks
// schedulers.
//
// Each callback includes a reference to the scheduler driver that was
// used to run this scheduler. The pointer will not change for the
// duration of a scheduler (i.e., from the point you do
// SchedulerDriver.Start() to the point that SchedulerDriver.Stop()
// returns). This is intended for convenience so that a scheduler
// doesn't need to store a reference to the driver itself.
type Scheduler interface {
// Invoked when the scheduler successfully registers with a Mesos
// master. A unique ID (generated by the master) used for
// distinguishing this framework from others and MasterInfo
// with the ip and port of the current master are provided as arguments.
Registered(SchedulerDriver, *mesos.FrameworkID, *mesos.MasterInfo)
// Invoked when the scheduler re-registers with a newly elected Mesos master.
// This is only called when the scheduler has previously been registered.
// MasterInfo containing the updated information about the elected master
// is provided as an argument.
Reregistered(SchedulerDriver, *mesos.MasterInfo)
// Invoked when the scheduler becomes "disconnected" from the master
// (e.g., the master fails and another is taking over).
Disconnected(SchedulerDriver)
// Invoked when resources have been offered to this framework. A
// single offer will only contain resources from a single slave.
// Resources associated with an offer will not be re-offered to
// _this_ framework until either (a) this framework has rejected
// those resources (see SchedulerDriver::launchTasks) or (b) those
// resources have been rescinded (see Scheduler::offerRescinded).
// Note that resources may be concurrently offered to more than one
// framework at a time (depending on the allocator being used). In
// that case, the first framework to launch tasks using those
// resources will be able to use them while the other frameworks
// will have those resources rescinded (or if a framework has
// already launched tasks with those resources then those tasks will
// fail with a TASK_LOST status and a message saying as much).
ResourceOffers(SchedulerDriver, []*mesos.Offer)
// Invoked when an offer is no longer valid (e.g., the slave was
// lost or another framework used resources in the offer). If for
// whatever reason an offer is never rescinded (e.g., dropped
// message, failing over framework, etc.), a framwork that attempts
// to launch tasks using an invalid offer will receive TASK_LOST
// status updates for those tasks (see Scheduler::resourceOffers).
OfferRescinded(SchedulerDriver, *mesos.OfferID)
// Invoked when the status of a task has changed (e.g., a slave is
// lost and so the task is lost, a task finishes and an executor
// sends a status update saying so, etc). Note that returning from
// this callback _acknowledges_ receipt of this status update! If
// for whatever reason the scheduler aborts during this callback (or
// the process exits) another status update will be delivered (note,
// however, that this is currently not true if the slave sending the
// status update is lost/fails during that time).
StatusUpdate(SchedulerDriver, *mesos.TaskStatus)
// Invoked when an executor sends a message. These messages are best
// effort; do not expect a framework message to be retransmitted in
// any reliable fashion.
FrameworkMessage(SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, string)
// Invoked when a slave has been determined unreachable (e.g.,
// machine failure, network partition). Most frameworks will need to
// reschedule any tasks launched on this slave on a new slave.
SlaveLost(SchedulerDriver, *mesos.SlaveID)
// Invoked when an executor has exited/terminated. Note that any
// tasks running will have TASK_LOST status updates automagically
// generated.
ExecutorLost(SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, int)
// Invoked when there is an unrecoverable error in the scheduler or
// scheduler driver. The driver will be aborted BEFORE invoking this
// callback.
Error(SchedulerDriver, string)
}

File diff suppressed because it is too large Load Diff

View File

@ -1,78 +0,0 @@
package scheduler
import (
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
type TestDriver struct {
*MesosSchedulerDriver
}
func (t *TestDriver) SetConnected(b bool) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.connected = b
}
func (t *TestDriver) Started() <-chan struct{} {
return t.started
}
func (t *TestDriver) Stopped() <-chan struct{} {
return t.stopCh
}
func (t *TestDriver) Done() <-chan struct{} {
return t.done
}
func (t *TestDriver) Framework() *mesos.FrameworkInfo {
return t.frameworkInfo
}
func (t *TestDriver) UPID() *upid.UPID {
return t.self
}
func (t *TestDriver) MasterPID() *upid.UPID {
return t.masterPid
}
func (t *TestDriver) Fatal(ctx context.Context, msg string) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.fatal(ctx, msg)
}
func (t *TestDriver) OnDispatch(f func(ctx context.Context, upid *upid.UPID, msg proto.Message) error) {
t.dispatch = f
}
func (t *TestDriver) HandleMasterChanged(ctx context.Context, from *upid.UPID, msg proto.Message) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.handleMasterChanged(ctx, from, msg)
}
func (t *TestDriver) CacheOffer(offer *mesos.Offer, pid *upid.UPID) {
t.cache.putOffer(offer, pid)
}
func (t *TestDriver) Context() context.Context {
return t.context()
}
func (t *TestDriver) FrameworkRegistered(ctx context.Context, from *upid.UPID, msg proto.Message) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.frameworkRegistered(ctx, from, msg)
}
func (t *TestDriver) FrameworkReregistered(ctx context.Context, from *upid.UPID, msg proto.Message) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.frameworkReregistered(ctx, from, msg)
}