Update mesos godeps to remove transient dependency on testify/mock

This commit is contained in:
Tim St. Clair 2016-02-26 17:26:43 -08:00
parent ce08973e96
commit 9da747dea1
15 changed files with 359 additions and 107 deletions

32
Godeps/Godeps.json generated
View File

@ -765,43 +765,43 @@
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/auth", "ImportPath": "github.com/mesos/mesos-go/auth",
"Comment": "before-0.26-protos-29-gb755e34", "Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" "Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/detector", "ImportPath": "github.com/mesos/mesos-go/detector",
"Comment": "before-0.26-protos-29-gb755e34", "Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" "Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/executor", "ImportPath": "github.com/mesos/mesos-go/executor",
"Comment": "before-0.26-protos-29-gb755e34", "Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" "Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/mesosproto", "ImportPath": "github.com/mesos/mesos-go/mesosproto",
"Comment": "before-0.26-protos-29-gb755e34", "Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" "Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/mesosutil", "ImportPath": "github.com/mesos/mesos-go/mesosutil",
"Comment": "before-0.26-protos-29-gb755e34", "Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" "Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/messenger", "ImportPath": "github.com/mesos/mesos-go/messenger",
"Comment": "before-0.26-protos-29-gb755e34", "Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" "Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/scheduler", "ImportPath": "github.com/mesos/mesos-go/scheduler",
"Comment": "before-0.26-protos-29-gb755e34", "Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" "Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/upid", "ImportPath": "github.com/mesos/mesos-go/upid",
"Comment": "before-0.26-protos-29-gb755e34", "Comment": "before-0.26-protos-33-g45c8b08",
"Rev": "b755e34cdb14a6de15ccb50605786c9ecacbfc1f" "Rev": "45c8b08e9af666add36a6f93ff8c1c75812367b0"
}, },
{ {
"ImportPath": "github.com/miekg/dns", "ImportPath": "github.com/miekg/dns",

View File

@ -0,0 +1,17 @@
package mock
import (
"github.com/gogo/protobuf/proto"
mock_messenger "github.com/mesos/mesos-go/messenger/mock"
"github.com/mesos/mesos-go/upid"
"github.com/stretchr/testify/mock"
"golang.org/x/net/context"
)
type Transport struct {
*mock_messenger.Messenger
}
func (m *Transport) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
return m.Called(mock.Anything, upid, msg).Error(0)
}

View File

@ -22,7 +22,7 @@ var (
plugins = map[string]PluginFactory{} plugins = map[string]PluginFactory{}
EmptySpecError = errors.New("empty master specification") EmptySpecError = errors.New("empty master specification")
defaultFactory = PluginFactory(func(spec string) (Master, error) { defaultFactory = PluginFactory(func(spec string, _ ...Option) (Master, error) {
if len(spec) == 0 { if len(spec) == 0 {
return nil, EmptySpecError return nil, EmptySpecError
} }
@ -37,7 +37,7 @@ var (
}) })
) )
type PluginFactory func(string) (Master, error) type PluginFactory func(string, ...Option) (Master, error)
// associates a plugin implementation with a Master specification prefix. // associates a plugin implementation with a Master specification prefix.
// packages that provide plugins are expected to invoke this func within // packages that provide plugins are expected to invoke this func within
@ -76,7 +76,7 @@ func Register(prefix string, f PluginFactory) error {
// are not yet running and will only begin to spawn requisite background // are not yet running and will only begin to spawn requisite background
// processing upon, or some time after, the first invocation of their Detect. // processing upon, or some time after, the first invocation of their Detect.
// //
func New(spec string) (m Master, err error) { func New(spec string, options ...Option) (m Master, err error) {
if strings.HasPrefix(spec, "file://") { if strings.HasPrefix(spec, "file://") {
var body []byte var body []byte
path := spec[7:] path := spec[7:]
@ -84,12 +84,12 @@ func New(spec string) (m Master, err error) {
if err != nil { if err != nil {
log.V(1).Infof("failed to read from file at '%s'", path) log.V(1).Infof("failed to read from file at '%s'", path)
} else { } else {
m, err = New(string(body)) m, err = New(string(body), options...)
} }
} else if f, ok := MatchingPlugin(spec); ok { } else if f, ok := MatchingPlugin(spec); ok {
m, err = f(spec) m, err = f(spec, options...)
} else { } else {
m, err = defaultFactory(spec) m, err = defaultFactory(spec, options...)
} }
return return

View File

@ -66,3 +66,6 @@ type Master interface {
// Detect() hasn't been invoked yet. // Detect() hasn't been invoked yet.
Cancel() Cancel()
} }
// functional option type for detectors
type Option func(interface{}) Option

View File

@ -9,7 +9,7 @@ import (
const ( const (
defaultSessionTimeout = 60 * time.Second defaultSessionTimeout = 60 * time.Second
currentPath = "." CurrentPath = "."
) )
var zkSessionTimeout = defaultSessionTimeout var zkSessionTimeout = defaultSessionTimeout
@ -43,25 +43,25 @@ func connect2(hosts []string, path string) (*client2, error) {
}, nil }, nil
} }
func (c *client2) stopped() <-chan struct{} { func (c *client2) Stopped() <-chan struct{} {
return c.done return c.done
} }
func (c *client2) stop() { func (c *client2) Stop() {
c.stopOnce.Do(c.Close) c.stopOnce.Do(c.Close)
} }
func (c *client2) data(path string) (data []byte, err error) { func (c *client2) Data(path string) (data []byte, err error) {
data, _, err = c.Get(path) data, _, err = c.Get(path)
return return
} }
func (c *client2) watchChildren(path string) (string, <-chan []string, <-chan error) { func (c *client2) WatchChildren(path string) (string, <-chan []string, <-chan error) {
errCh := make(chan error, 1) errCh := make(chan error, 1)
snap := make(chan []string) snap := make(chan []string)
watchPath := c.path watchPath := c.path
if path != "" && path != currentPath { if path != "" && path != CurrentPath {
watchPath = watchPath + path watchPath = watchPath + path
} }
go func() { go func() {

View File

@ -42,37 +42,57 @@ const (
defaultMinDetectorCyclePeriod = 1 * time.Second defaultMinDetectorCyclePeriod = 1 * time.Second
) )
type (
ZKInterface interface {
Stopped() <-chan struct{}
Stop()
Data(string) ([]byte, error)
WatchChildren(string) (string, <-chan []string, <-chan error)
}
infoCodec func(path, node string) (*mesos.MasterInfo, error)
// Detector uses ZooKeeper to detect new leading master.
MasterDetector struct {
// detection should not signal master change listeners more frequently than this
cancel func()
client ZKInterface
done chan struct{}
// latch: only install, at most, one ignoreChanged listener; see MasterDetector.Detect
ignoreInstalled int32
leaderNode string
minDetectorCyclePeriod time.Duration
// guard against concurrent invocations of bootstrapFunc
bootstrapLock sync.RWMutex
bootstrapFunc func(ZKInterface, <-chan struct{}) (ZKInterface, error) // for one-time zk client initiation
}
)
// reasonable default for a noop change listener // reasonable default for a noop change listener
var ignoreChanged = detector.OnMasterChanged(func(*mesos.MasterInfo) {}) var ignoreChanged = detector.OnMasterChanged(func(*mesos.MasterInfo) {})
type zkInterface interface { // MinCyclePeriod is a functional option that determines the highest frequency of master change notifications
stopped() <-chan struct{} func MinCyclePeriod(d time.Duration) detector.Option {
stop() return func(di interface{}) detector.Option {
data(string) ([]byte, error) md := di.(*MasterDetector)
watchChildren(string) (string, <-chan []string, <-chan error) old := md.minDetectorCyclePeriod
md.minDetectorCyclePeriod = d
return MinCyclePeriod(old)
}
} }
type infoCodec func(path, node string) (*mesos.MasterInfo, error) func Bootstrap(f func(ZKInterface, <-chan struct{}) (ZKInterface, error)) detector.Option {
return func(di interface{}) detector.Option {
// Detector uses ZooKeeper to detect new leading master. md := di.(*MasterDetector)
type MasterDetector struct { old := md.bootstrapFunc
client zkInterface md.bootstrapFunc = f
leaderNode string return Bootstrap(old)
}
bootstrapLock sync.RWMutex // guard against concurrent invocations of bootstrapFunc
bootstrapFunc func() error // for one-time zk client initiation
// latch: only install, at most, one ignoreChanged listener; see MasterDetector.Detect
ignoreInstalled int32
// detection should not signal master change listeners more frequently than this
minDetectorCyclePeriod time.Duration
done chan struct{}
cancel func()
} }
// Internal constructor function // Internal constructor function
func NewMasterDetector(zkurls string) (*MasterDetector, error) { func NewMasterDetector(zkurls string, options ...detector.Option) (*MasterDetector, error) {
zkHosts, zkPath, err := parseZk(zkurls) zkHosts, zkPath, err := parseZk(zkurls)
if err != nil { if err != nil {
log.Fatalln("Failed to parse url", err) log.Fatalln("Failed to parse url", err)
@ -85,11 +105,16 @@ func NewMasterDetector(zkurls string) (*MasterDetector, error) {
cancel: func() {}, cancel: func() {},
} }
detector.bootstrapFunc = func() (err error) { detector.bootstrapFunc = func(client ZKInterface, _ <-chan struct{}) (ZKInterface, error) {
if detector.client == nil { if client == nil {
detector.client, err = connect2(zkHosts, zkPath) return connect2(zkHosts, zkPath)
} }
return return client, nil
}
// apply options last so that they can override default behavior
for _, opt := range options {
opt(detector)
} }
log.V(2).Infoln("Created new detector to watch", zkHosts, zkPath) log.V(2).Infoln("Created new detector to watch", zkHosts, zkPath)
@ -157,7 +182,7 @@ func logPanic(f func()) {
} }
func (md *MasterDetector) pullMasterInfo(path, node string) (*mesos.MasterInfo, error) { func (md *MasterDetector) pullMasterInfo(path, node string) (*mesos.MasterInfo, error) {
data, err := md.client.data(fmt.Sprintf("%s/%s", path, node)) data, err := md.client.Data(fmt.Sprintf("%s/%s", path, node))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to retrieve leader data: %v", err) return nil, fmt.Errorf("failed to retrieve leader data: %v", err)
} }
@ -171,7 +196,7 @@ func (md *MasterDetector) pullMasterInfo(path, node string) (*mesos.MasterInfo,
} }
func (md *MasterDetector) pullMasterJsonInfo(path, node string) (*mesos.MasterInfo, error) { func (md *MasterDetector) pullMasterJsonInfo(path, node string) (*mesos.MasterInfo, error) {
data, err := md.client.data(fmt.Sprintf("%s/%s", path, node)) data, err := md.client.Data(fmt.Sprintf("%s/%s", path, node))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to retrieve leader data: %v", err) return nil, fmt.Errorf("failed to retrieve leader data: %v", err)
} }
@ -226,13 +251,15 @@ func (md *MasterDetector) callBootstrap() (e error) {
defer md.bootstrapLock.Unlock() defer md.bootstrapLock.Unlock()
clientConfigured := md.client != nil clientConfigured := md.client != nil
if e = md.bootstrapFunc(); e == nil && !clientConfigured && md.client != nil {
md.client, e = md.bootstrapFunc(md.client, md.done)
if e == nil && !clientConfigured && md.client != nil {
// chain the lifetime of this detector to that of the newly created client impl // chain the lifetime of this detector to that of the newly created client impl
client := md.client client := md.client
md.cancel = client.stop md.cancel = client.Stop
go func() { go func() {
defer close(md.done) defer close(md.done)
<-client.stopped() <-client.Stopped()
}() }()
} }
return return
@ -265,7 +292,7 @@ func (md *MasterDetector) Detect(f detector.MasterChanged) (err error) {
} }
func (md *MasterDetector) detect(f detector.MasterChanged) { func (md *MasterDetector) detect(f detector.MasterChanged) {
log.V(3).Infoln("detecting children at", currentPath) log.V(3).Infoln("detecting children at", CurrentPath)
detectLoop: detectLoop:
for { for {
select { select {
@ -273,8 +300,8 @@ detectLoop:
return return
default: default:
} }
log.V(3).Infoln("watching children at", currentPath) log.V(3).Infoln("watching children at", CurrentPath)
path, childrenCh, errCh := md.client.watchChildren(currentPath) path, childrenCh, errCh := md.client.WatchChildren(CurrentPath)
rewatch := false rewatch := false
for { for {
started := time.Now() started := time.Now()

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package zoo package mock
import ( import (
"github.com/samuel/go-zookeeper/zk" "github.com/samuel/go-zookeeper/zk"
@ -25,19 +25,19 @@ import (
// Impersontates a zk.Connection // Impersontates a zk.Connection
// It implements interface Connector // It implements interface Connector
type MockConnector struct { type Connector struct {
mock.Mock mock.Mock
} }
func NewMockConnector() *MockConnector { func NewConnector() *Connector {
return new(MockConnector) return new(Connector)
} }
func (conn *MockConnector) Close() { func (conn *Connector) Close() {
conn.Called() conn.Called()
} }
func (conn *MockConnector) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) { func (conn *Connector) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) {
args := conn.Called(path) args := conn.Called(path)
var ( var (
arg0 []string arg0 []string
@ -56,14 +56,14 @@ func (conn *MockConnector) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk
return arg0, arg1, arg2, args.Error(3) return arg0, arg1, arg2, args.Error(3)
} }
func (conn *MockConnector) Children(path string) ([]string, *zk.Stat, error) { func (conn *Connector) Children(path string) ([]string, *zk.Stat, error) {
args := conn.Called(path) args := conn.Called(path)
return args.Get(0).([]string), return args.Get(0).([]string),
args.Get(1).(*zk.Stat), args.Get(1).(*zk.Stat),
args.Error(2) args.Error(2)
} }
func (conn *MockConnector) Get(path string) ([]byte, *zk.Stat, error) { func (conn *Connector) Get(path string) ([]byte, *zk.Stat, error) {
args := conn.Called(path) args := conn.Called(path)
return args.Get(0).([]byte), return args.Get(0).([]byte),
args.Get(1).(*zk.Stat), args.Get(1).(*zk.Stat),

View File

@ -0,0 +1,81 @@
package mock
import (
"sync"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector/zoo"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/mock"
)
type Client struct {
mock.Mock
}
func (m *Client) Stopped() (a <-chan struct{}) {
args := m.Called()
if x := args.Get(0); x != nil {
a = x.(<-chan struct{})
}
return
}
func (m *Client) Stop() {
m.Called()
}
func (m *Client) Data(path string) (a []byte, b error) {
args := m.Called(path)
if x := args.Get(0); x != nil {
a = x.([]byte)
}
b = args.Error(1)
return
}
func (m *Client) WatchChildren(path string) (a string, b <-chan []string, c <-chan error) {
args := m.Called(path)
a = args.String(0)
if x := args.Get(1); x != nil {
b = x.(<-chan []string)
}
if x := args.Get(2); x != nil {
c = x.(<-chan error)
}
return
}
// newMockZkClient returns a mocked implementation of ZKInterface that implements expectations
// for Stop() and Stopped(); multiple calls to Stop() are safe.
func NewClient(testZkPath string, initialChildren ...string) (mocked *Client, snaps chan []string, errs chan error) {
var doneOnce sync.Once
done := make(chan struct{})
mocked = &Client{}
mocked.On("Stop").Return().Run(func(_ mock.Arguments) { doneOnce.Do(func() { close(done) }) })
mocked.On("Stopped").Return((<-chan struct{})(done))
if initialChildren != nil {
errs = make(chan error) // this is purposefully unbuffered (some tests depend on this)
snaps = make(chan []string, 1)
snaps <- initialChildren[:]
mocked.On("WatchChildren", zoo.CurrentPath).Return(
testZkPath, (<-chan []string)(snaps), (<-chan error)(errs)).Run(
func(_ mock.Arguments) { log.V(1).Infoln("WatchChildren invoked") })
}
return
}
// implements MasterChanged and AllMasters extension
type AllMastersListener struct {
mock.Mock
}
func (a *AllMastersListener) OnMasterChanged(mi *mesos.MasterInfo) {
a.Called(mi)
}
func (a *AllMastersListener) UpdatedMasters(mi []*mesos.MasterInfo) {
a.Called(mi)
}

View File

@ -5,7 +5,7 @@ import (
) )
func init() { func init() {
detector.Register("zk://", detector.PluginFactory(func(spec string) (detector.Master, error) { detector.Register("zk://", detector.PluginFactory(func(spec string, options ...detector.Option) (detector.Master, error) {
return NewMasterDetector(spec) return NewMasterDetector(spec, options...)
})) }))
} }

View File

@ -16,59 +16,60 @@
* limitations under the License. * limitations under the License.
*/ */
package executor package mock
import ( import (
"github.com/mesos/mesos-go/executor"
"github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
) )
// MockedExecutor is used for testing the executor driver. // Executor is used for testing the executor driver.
type MockedExecutor struct { type Executor struct {
mock.Mock mock.Mock
} }
// NewMockedExecutor returns a mocked executor. // New returns a mocked executor.
func NewMockedExecutor() *MockedExecutor { func New() *Executor {
return &MockedExecutor{} return &Executor{}
} }
// Registered implements the Registered handler. // Registered implements the Registered handler.
func (e *MockedExecutor) Registered(ExecutorDriver, *mesosproto.ExecutorInfo, *mesosproto.FrameworkInfo, *mesosproto.SlaveInfo) { func (e *Executor) Registered(executor.ExecutorDriver, *mesosproto.ExecutorInfo, *mesosproto.FrameworkInfo, *mesosproto.SlaveInfo) {
e.Called() e.Called()
} }
// Reregistered implements the Reregistered handler. // Reregistered implements the Reregistered handler.
func (e *MockedExecutor) Reregistered(ExecutorDriver, *mesosproto.SlaveInfo) { func (e *Executor) Reregistered(executor.ExecutorDriver, *mesosproto.SlaveInfo) {
e.Called() e.Called()
} }
// Disconnected implements the Disconnected handler. // Disconnected implements the Disconnected handler.
func (e *MockedExecutor) Disconnected(ExecutorDriver) { func (e *Executor) Disconnected(executor.ExecutorDriver) {
e.Called() e.Called()
} }
// LaunchTask implements the LaunchTask handler. // LaunchTask implements the LaunchTask handler.
func (e *MockedExecutor) LaunchTask(ExecutorDriver, *mesosproto.TaskInfo) { func (e *Executor) LaunchTask(executor.ExecutorDriver, *mesosproto.TaskInfo) {
e.Called() e.Called()
} }
// KillTask implements the KillTask handler. // KillTask implements the KillTask handler.
func (e *MockedExecutor) KillTask(ExecutorDriver, *mesosproto.TaskID) { func (e *Executor) KillTask(executor.ExecutorDriver, *mesosproto.TaskID) {
e.Called() e.Called()
} }
// FrameworkMessage implements the FrameworkMessage handler. // FrameworkMessage implements the FrameworkMessage handler.
func (e *MockedExecutor) FrameworkMessage(ExecutorDriver, string) { func (e *Executor) FrameworkMessage(executor.ExecutorDriver, string) {
e.Called() e.Called()
} }
// Shutdown implements the Shutdown handler. // Shutdown implements the Shutdown handler.
func (e *MockedExecutor) Shutdown(ExecutorDriver) { func (e *Executor) Shutdown(executor.ExecutorDriver) {
e.Called() e.Called()
} }
// Error implements the Error handler. // Error implements the Error handler.
func (e *MockedExecutor) Error(ExecutorDriver, string) { func (e *Executor) Error(executor.ExecutorDriver, string) {
e.Called() e.Called()
} }

View File

@ -0,0 +1,42 @@
package executor
import (
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
type TestDriver struct {
*MesosExecutorDriver
}
func (e *TestDriver) SetConnected(b bool) {
e.guarded(func() {
e.connected = b
})
}
func (e *TestDriver) SetMessenger(m messenger.Messenger) {
e.messenger = m
}
func (e *TestDriver) Started() <-chan struct{} {
return e.started
}
func (e *TestDriver) guarded(f func()) {
e.lock.Lock()
defer e.lock.Unlock()
f()
}
func (e *TestDriver) Context() context.Context {
return e.context()
}
func (e *TestDriver) StatusUpdateAcknowledgement(ctx context.Context, from *upid.UPID, msg proto.Message) {
e.guarded(func() {
e.statusUpdateAcknowledgement(ctx, from, msg)
})
}

View File

@ -16,12 +16,13 @@
* limitations under the License. * limitations under the License.
*/ */
package messenger package mock
import ( import (
"reflect" "reflect"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/upid" "github.com/mesos/mesos-go/upid"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -32,46 +33,46 @@ type message struct {
msg proto.Message msg proto.Message
} }
// MockedMessenger is a messenger that returns error on every operation. // Messenger is a messenger that returns error on every operation.
type MockedMessenger struct { type Messenger struct {
mock.Mock mock.Mock
messageQueue chan *message messageQueue chan *message
handlers map[string]MessageHandler handlers map[string]messenger.MessageHandler
stop chan struct{} stop chan struct{}
} }
// NewMockedMessenger returns a mocked messenger used for testing. // NewMessenger returns a mocked messenger used for testing.
func NewMockedMessenger() *MockedMessenger { func NewMessenger() *Messenger {
return &MockedMessenger{ return &Messenger{
messageQueue: make(chan *message, 1), messageQueue: make(chan *message, 1),
handlers: make(map[string]MessageHandler), handlers: make(map[string]messenger.MessageHandler),
stop: make(chan struct{}), stop: make(chan struct{}),
} }
} }
// Install is a mocked implementation. // Install is a mocked implementation.
func (m *MockedMessenger) Install(handler MessageHandler, msg proto.Message) error { func (m *Messenger) Install(handler messenger.MessageHandler, msg proto.Message) error {
m.handlers[reflect.TypeOf(msg).Elem().Name()] = handler m.handlers[reflect.TypeOf(msg).Elem().Name()] = handler
return m.Called().Error(0) return m.Called().Error(0)
} }
// Send is a mocked implementation. // Send is a mocked implementation.
func (m *MockedMessenger) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error { func (m *Messenger) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
return m.Called().Error(0) return m.Called().Error(0)
} }
func (m *MockedMessenger) Route(ctx context.Context, upid *upid.UPID, msg proto.Message) error { func (m *Messenger) Route(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
return m.Called().Error(0) return m.Called().Error(0)
} }
// Start is a mocked implementation. // Start is a mocked implementation.
func (m *MockedMessenger) Start() error { func (m *Messenger) Start() error {
go m.recvLoop() go m.recvLoop()
return m.Called().Error(0) return m.Called().Error(0)
} }
// Stop is a mocked implementation. // Stop is a mocked implementation.
func (m *MockedMessenger) Stop() error { func (m *Messenger) Stop() error {
// don't close an already-closed channel // don't close an already-closed channel
select { select {
case <-m.stop: case <-m.stop:
@ -83,11 +84,11 @@ func (m *MockedMessenger) Stop() error {
} }
// UPID is a mocked implementation. // UPID is a mocked implementation.
func (m *MockedMessenger) UPID() upid.UPID { func (m *Messenger) UPID() upid.UPID {
return m.Called().Get(0).(upid.UPID) return m.Called().Get(0).(upid.UPID)
} }
func (m *MockedMessenger) recvLoop() { func (m *Messenger) recvLoop() {
for { for {
select { select {
case <-m.stop: case <-m.stop:
@ -101,6 +102,6 @@ func (m *MockedMessenger) recvLoop() {
// Recv receives a upid and a message, it will dispatch the message to its handler // Recv receives a upid and a message, it will dispatch the message to its handler
// with the upid. This is for testing. // with the upid. This is for testing.
func (m *MockedMessenger) Recv(from *upid.UPID, msg proto.Message) { func (m *Messenger) Recv(from *upid.UPID, msg proto.Message) {
m.messageQueue <- &message{from, msg} m.messageQueue <- &message{from, msg}
} }

View File

@ -1,16 +1,18 @@
package scheduler package mock
import ( import (
log "github.com/golang/glog" log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
. "github.com/mesos/mesos-go/scheduler"
) )
type MockScheduler struct { type MockScheduler struct {
mock.Mock mock.Mock
} }
func NewMockScheduler() *MockScheduler { func New() *MockScheduler {
return &MockScheduler{} return &MockScheduler{}
} }

View File

@ -809,7 +809,7 @@ func (driver *MesosSchedulerDriver) frameworkMessageRcvd(_ context.Context, from
func (driver *MesosSchedulerDriver) frameworkErrorRcvd(ctx context.Context, from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) frameworkErrorRcvd(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling framework error event.") log.V(1).Infoln("Handling framework error event.")
msg := pbMsg.(*mesos.FrameworkErrorMessage) msg := pbMsg.(*mesos.FrameworkErrorMessage)
driver.error(ctx, msg.GetMessage()) driver.fatal(ctx, msg.GetMessage())
} }
// ---------------------- Interface Methods ---------------------- // // ---------------------- Interface Methods ---------------------- //
@ -1530,7 +1530,7 @@ func (driver *MesosSchedulerDriver) ReconcileTasks(statuses []*mesos.TaskStatus)
} }
// error expects to be guarded by eventLock // error expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) error(ctx context.Context, err string) { func (driver *MesosSchedulerDriver) fatal(ctx context.Context, err string) {
if driver.status == mesos.Status_DRIVER_ABORTED { if driver.status == mesos.Status_DRIVER_ABORTED {
log.V(3).Infoln("Ignoring error message, the driver is aborted!") log.V(3).Infoln("Ignoring error message, the driver is aborted!")
return return

View File

@ -0,0 +1,78 @@
package scheduler
import (
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
type TestDriver struct {
*MesosSchedulerDriver
}
func (t *TestDriver) SetConnected(b bool) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.connected = b
}
func (t *TestDriver) Started() <-chan struct{} {
return t.started
}
func (t *TestDriver) Stopped() <-chan struct{} {
return t.stopCh
}
func (t *TestDriver) Done() <-chan struct{} {
return t.done
}
func (t *TestDriver) Framework() *mesos.FrameworkInfo {
return t.frameworkInfo
}
func (t *TestDriver) UPID() *upid.UPID {
return t.self
}
func (t *TestDriver) MasterPID() *upid.UPID {
return t.masterPid
}
func (t *TestDriver) Fatal(ctx context.Context, msg string) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.fatal(ctx, msg)
}
func (t *TestDriver) OnDispatch(f func(ctx context.Context, upid *upid.UPID, msg proto.Message) error) {
t.dispatch = f
}
func (t *TestDriver) HandleMasterChanged(ctx context.Context, from *upid.UPID, msg proto.Message) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.handleMasterChanged(ctx, from, msg)
}
func (t *TestDriver) CacheOffer(offer *mesos.Offer, pid *upid.UPID) {
t.cache.putOffer(offer, pid)
}
func (t *TestDriver) Context() context.Context {
return t.context()
}
func (t *TestDriver) FrameworkRegistered(ctx context.Context, from *upid.UPID, msg proto.Message) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.frameworkRegistered(ctx, from, msg)
}
func (t *TestDriver) FrameworkReregistered(ctx context.Context, from *upid.UPID, msg proto.Message) {
t.eventLock.Lock()
defer t.eventLock.Unlock()
t.frameworkReregistered(ctx, from, msg)
}