diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 858f13ea147..ce35855f176 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -387,7 +387,7 @@ }, { "ImportPath": "github.com/mesos/mesos-go/auth", - "Rev": "4b1767c0dfc51020e01f35da5b38472f40ce572a" + "Rev": "6440c09c9d8a1b365f3c3e9b2297dd856abd017c" }, { "ImportPath": "github.com/mesos/mesos-go/detector", diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/sasl/authenticatee_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/sasl/authenticatee_test.go index 9fd37b6fb96..2a4d5873212 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/sasl/authenticatee_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/sasl/authenticatee_test.go @@ -65,26 +65,27 @@ func TestAuthticatee_validLogin(t *testing.T) { transport.On("Stop").Return(nil) transport.On("Send", mock.Anything, &server, &mesos.AuthenticateMessage{ Pid: proto.String(client.String()), - }).Return(nil).Once() + }).Return(nil).Run(func(_ mock.Arguments) { + transport.Recv(&server, &mesos.AuthenticationMechanismsMessage{ + Mechanisms: []string{crammd5.Name}, + }) + }).Once() transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStartMessage{ Mechanism: proto.String(crammd5.Name), Data: proto.String(""), // may be nil, depends on init step - }).Return(nil).Once() - - transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStepMessage{ - Data: []byte(`foo cc7fd96cd80123ea844a7dba29a594ed`), - }).Return(nil).Once() - - go func() { - transport.Recv(&server, &mesos.AuthenticationMechanismsMessage{ - Mechanisms: []string{crammd5.Name}, - }) + }).Return(nil).Run(func(_ mock.Arguments) { transport.Recv(&server, &mesos.AuthenticationStepMessage{ Data: []byte(`lsd;lfkgjs;dlfkgjs;dfklg`), }) + }).Once() + + transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStepMessage{ + Data: []byte(`foo cc7fd96cd80123ea844a7dba29a594ed`), + }).Return(nil).Run(func(_ mock.Arguments) { transport.Recv(&server, &mesos.AuthenticationCompletedMessage{}) - }() + }).Once() + return transport }) login, err := makeAuthenticatee(handler, factory) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/factory.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/factory.go index 7c0554f657a..2cbe78fdd44 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/factory.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/factory.go @@ -130,7 +130,7 @@ func CreateMasterInfo(pid *upid.UPID) *mesos.MasterInfo { // This is needed for the people cross-compiling from macos to linux. // The cross-compiled version of net.LookupIP() fails to handle plain IPs. // See https://github.com/mesos/mesos-go/pull/117 - } else if addrs, err := net.LookupIP(pid.Host); err == nil { + } else if addrs, err := net.LookupIP(pid.Host); err == nil { for _, ip := range addrs { if ip = ip.To4(); ip != nil { ipv4 = ip diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/interface.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/interface.go index 03e0bb7cfc0..d9081bbca51 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/interface.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/interface.go @@ -27,6 +27,21 @@ type MasterChanged interface { OnMasterChanged(*mesos.MasterInfo) } +// AllMasters defines an optional interface that, if implemented by the same +// struct as implements MasterChanged, will receive an additional callbacks +// independently of leadership changes. it's possible that, as a result of a +// leadership change, both the OnMasterChanged and UpdatedMasters callbacks +// would be invoked. +// +// **NOTE:** Detector implementations are not required to support this optional +// interface. Please RTFM of the detector implementation that you want to use. +type AllMasters interface { + // UpdatedMasters is invoked upon a change in the membership of mesos + // masters, and is useful to clients that wish to know the entire set + // of Mesos masters currently running. + UpdatedMasters([]*mesos.MasterInfo) +} + // func/interface adapter type OnMasterChanged func(*mesos.MasterInfo) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client_test.go index 596ee196624..1ebca734b36 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strings" + "sync/atomic" "testing" "time" @@ -237,9 +238,9 @@ func TestWatchChildren_flappy(t *testing.T) { })) go c.connect() - watchChildrenCount := 0 + var watchChildrenCount uint64 watcherFunc := ChildWatcher(func(zkc *Client, path string) { - log.V(1).Infof("ChildWatcher invoked %d", watchChildrenCount) + log.V(1).Infof("ChildWatcher invoked %d", atomic.LoadUint64(&watchChildrenCount)) }) startTime := time.Now() endTime := startTime.Add(2 * time.Second) @@ -252,7 +253,7 @@ watcherLoop: if _, err := c.watchChildren(currentPath, watcherFunc); err == nil { // watching children succeeded!! t.Logf("child watch success") - watchChildrenCount++ + atomic.AddUint64(&watchChildrenCount, 1) } else { // setting the watch failed t.Logf("setting child watch failed: %v", err) @@ -264,7 +265,9 @@ watcherLoop: case <-time.After(endTime.Sub(time.Now())): } } - assert.Equal(t, 5, watchChildrenCount, "expected watchChildrenCount = 5 instead of %d, should be reinvoked upon initial ChildrenW failures", watchChildrenCount) + + wantChildrenCount := atomic.LoadUint64(&watchChildrenCount) + assert.Equal(t, uint64(5), wantChildrenCount, "expected watchChildrenCount = 5 instead of %d, should be reinvoked upon initial ChildrenW failures", wantChildrenCount) } func makeClient() (*Client, error) { diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect.go index 3c2cefa235a..48eee4b4140 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect.go @@ -112,8 +112,12 @@ func (md *MasterDetector) childrenChanged(zkc *Client, path string, obs detector return } - topNode := selectTopNode(list) + md.notifyMasterChanged(path, list, obs) + md.notifyAllMasters(path, list, obs) +} +func (md *MasterDetector) notifyMasterChanged(path string, list []string, obs detector.MasterChanged) { + topNode := selectTopNode(list) if md.leaderNode == topNode { log.V(2).Infof("ignoring children-changed event, leader has not changed: %v", path) return @@ -124,21 +128,57 @@ func (md *MasterDetector) childrenChanged(zkc *Client, path string, obs detector var masterInfo *mesos.MasterInfo if md.leaderNode != "" { - data, err := zkc.data(fmt.Sprintf("%s/%s", path, topNode)) - if err != nil { - log.Errorf("unable to retrieve leader data: %v", err.Error()) - return - } - - masterInfo = new(mesos.MasterInfo) - err = proto.Unmarshal(data, masterInfo) - if err != nil { - log.Errorf("unable to unmarshall MasterInfo data from zookeeper: %v", err) - return + var err error + if masterInfo, err = md.pullMasterInfo(path, topNode); err != nil { + log.Errorln(err.Error()) } } log.V(2).Infof("detected master info: %+v", masterInfo) - obs.OnMasterChanged(masterInfo) + logPanic(func() { obs.OnMasterChanged(masterInfo) }) +} + +// logPanic safely executes the given func, recovering from and logging a panic if one occurs. +func logPanic(f func()) { + defer func() { + if r := recover(); r != nil { + log.Errorf("recovered from client panic: %v", r) + } + }() + f() +} + +func (md *MasterDetector) pullMasterInfo(path, node string) (*mesos.MasterInfo, error) { + data, err := md.client.data(fmt.Sprintf("%s/%s", path, node)) + if err != nil { + return nil, fmt.Errorf("failed to retrieve leader data: %v", err) + } + + masterInfo := &mesos.MasterInfo{} + err = proto.Unmarshal(data, masterInfo) + if err != nil { + return nil, fmt.Errorf("failed to unmarshall MasterInfo data from zookeeper: %v", err) + } + return masterInfo, nil +} + +func (md *MasterDetector) notifyAllMasters(path string, list []string, obs detector.MasterChanged) { + all, ok := obs.(detector.AllMasters) + if !ok { + // not interested in entire master list + return + } + masters := []*mesos.MasterInfo{} + for _, node := range list { + info, err := md.pullMasterInfo(path, node) + if err != nil { + log.Errorln(err.Error()) + } else { + masters = append(masters, info) + } + } + + log.V(2).Infof("notifying of master membership change: %+v", masters) + logPanic(func() { all.UpdatedMasters(masters) }) } // the first call to Detect will kickstart a connection to zookeeper. a nil change listener may diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect_test.go index 2cbd1e11b71..de1ce9762a9 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect_test.go @@ -8,11 +8,13 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" log "github.com/golang/glog" "github.com/mesos/mesos-go/detector" mesos "github.com/mesos/mesos-go/mesosproto" "github.com/samuel/go-zookeeper/zk" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) const ( @@ -324,10 +326,10 @@ func TestMasterDetectMultiple(t *testing.T) { // **** Test 4 consecutive ChildrenChangedEvents ****** // setup event changes sequences := [][]string{ - []string{"info_014", "info_010", "info_005"}, - []string{"info_005", "info_004", "info_022"}, - []string{}, // indicates no master - []string{"info_017", "info_099", "info_200"}, + {"info_014", "info_010", "info_005"}, + {"info_005", "info_004", "info_022"}, + {}, // indicates no master + {"info_017", "info_099", "info_200"}, } var wg sync.WaitGroup @@ -425,3 +427,138 @@ func TestMasterDetect_selectTopNode_mixedEntries(t *testing.T) { node := selectTopNode(nodeList) assert.Equal("info_0000000032", node) } + +// implements MasterChanged and AllMasters extension +type allMastersListener struct { + mock.Mock +} + +func (a *allMastersListener) OnMasterChanged(mi *mesos.MasterInfo) { + a.Called(mi) +} + +func (a *allMastersListener) UpdatedMasters(mi []*mesos.MasterInfo) { + a.Called(mi) +} + +func afterFunc(f func()) <-chan struct{} { + ch := make(chan struct{}) + go func() { + defer close(ch) + f() + }() + return ch +} + +func fatalAfter(t *testing.T, d time.Duration, f func(), msg string, args ...interface{}) { + ch := afterFunc(f) + select { + case <-ch: + return + case <-time.After(d): + t.Fatalf(msg, args...) + } +} + +func TestNotifyAllMasters(t *testing.T) { + c, err := newClient(test_zk_hosts, test_zk_path) + assert.NoError(t, err) + + childEvents := make(chan zk.Event, 5) + connector := NewMockConnector() + + c.setFactory(asFactory(func() (Connector, <-chan zk.Event, error) { + sessionEvents := make(chan zk.Event, 1) + sessionEvents <- zk.Event{ + Type: zk.EventSession, + State: zk.StateConnected, + } + return connector, sessionEvents, nil + })) + + md, err := NewMasterDetector(zkurl) + defer md.Cancel() + + assert.NoError(t, err) + + c.errorHandler = ErrorHandler(func(c *Client, e error) { + t.Fatalf("unexpected error: %v", e) + }) + md.client = c + + listener := &allMastersListener{} + + //-- expect primer + var primer sync.WaitGroup + ignoreArgs := func(f func()) func(mock.Arguments) { + primer.Add(1) + return func(_ mock.Arguments) { + f() + } + } + connector.On("Children", test_zk_path).Return([]string{}, &zk.Stat{}, nil).Run(ignoreArgs(primer.Done)).Once() + listener.On("UpdatedMasters", []*mesos.MasterInfo{}).Return().Run(ignoreArgs(primer.Done)).Once() + connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(childEvents), nil).Run(ignoreArgs(primer.Done)).Once() + md.Detect(listener) + fatalAfter(t, 3*time.Second, primer.Wait, "timed out waiting for detection primer") + + listener.AssertExpectations(t) + connector.AssertExpectations(t) + + //-- test membership changes + type expectedGets struct { + info []byte + err error + } + tt := []struct { + zkEntry []string + gets []expectedGets + leaderIdx int + }{ + {[]string{"info_004"}, []expectedGets{{newTestMasterInfo(1), nil}}, 0}, + {[]string{"info_007", "info_005", "info_006"}, []expectedGets{{newTestMasterInfo(2), nil}, {newTestMasterInfo(3), nil}, {newTestMasterInfo(4), nil}}, 1}, + {nil, nil, -1}, + } + for j, tc := range tt { + // expectations + var tcwait sync.WaitGroup + ignoreArgs = func(f func()) func(mock.Arguments) { + tcwait.Add(1) + return func(_ mock.Arguments) { + f() + } + } + + expectedInfos := []*mesos.MasterInfo{} + for i, zke := range tc.zkEntry { + connector.On("Get", fmt.Sprintf("%s/%s", test_zk_path, zke)).Return(tc.gets[i].info, &zk.Stat{}, tc.gets[i].err).Run(ignoreArgs(tcwait.Done)).Once() + masterInfo := &mesos.MasterInfo{} + err = proto.Unmarshal(tc.gets[i].info, masterInfo) + if err != nil { + t.Fatalf("failed to unmarshall MasterInfo data: %v", err) + } + expectedInfos = append(expectedInfos, masterInfo) + } + if len(tc.zkEntry) > 0 { + connector.On("Get", fmt.Sprintf("%s/%s", test_zk_path, tc.zkEntry[tc.leaderIdx])).Return( + tc.gets[tc.leaderIdx].info, &zk.Stat{}, tc.gets[tc.leaderIdx].err).Run(ignoreArgs(tcwait.Done)).Once() + } + connector.On("Children", test_zk_path).Return(tc.zkEntry, &zk.Stat{}, nil).Run(ignoreArgs(tcwait.Done)).Once() + listener.On("OnMasterChanged", mock.AnythingOfType("*mesosproto.MasterInfo")).Return().Run(ignoreArgs(tcwait.Done)).Once() + listener.On("UpdatedMasters", expectedInfos).Return().Run(ignoreArgs(tcwait.Done)).Once() + connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(childEvents), nil).Run(ignoreArgs(tcwait.Done)).Once() + + // fire the event that triggers the test case + childEvents <- zk.Event{ + Type: zk.EventNodeChildrenChanged, + Path: test_zk_path, + } + + // allow plenty of time for all the async processing to happen + fatalAfter(t, 5*time.Second, tcwait.Wait, "timed out waiting for all-masters test case %d", j+1) + listener.AssertExpectations(t) + connector.AssertExpectations(t) + } + + connector.On("Close").Return(nil) +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/doc.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/doc.go new file mode 100644 index 00000000000..010ba055d85 --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/doc.go @@ -0,0 +1,3 @@ +// Zookeeper-based mesos-master leaderhip detection. +// Implements support for optional detector.AllMasters interface. +package zoo diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example-executor b/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example-executor new file mode 100755 index 00000000000..540c230027f Binary files /dev/null and b/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example-executor differ diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example-scheduler b/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example-scheduler new file mode 100755 index 00000000000..932556ff1e6 Binary files /dev/null and b/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example-scheduler differ diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example_executor.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example_executor.go new file mode 100644 index 00000000000..91c9d624193 --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example_executor.go @@ -0,0 +1,121 @@ +// +build example-exec + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "flag" + "fmt" + exec "github.com/mesos/mesos-go/executor" + mesos "github.com/mesos/mesos-go/mesosproto" +) + +type exampleExecutor struct { + tasksLaunched int +} + +func newExampleExecutor() *exampleExecutor { + return &exampleExecutor{tasksLaunched: 0} +} + +func (exec *exampleExecutor) Registered(driver exec.ExecutorDriver, execInfo *mesos.ExecutorInfo, fwinfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) { + fmt.Println("Registered Executor on slave ", slaveInfo.GetHostname()) +} + +func (exec *exampleExecutor) Reregistered(driver exec.ExecutorDriver, slaveInfo *mesos.SlaveInfo) { + fmt.Println("Re-registered Executor on slave ", slaveInfo.GetHostname()) +} + +func (exec *exampleExecutor) Disconnected(exec.ExecutorDriver) { + fmt.Println("Executor disconnected.") +} + +func (exec *exampleExecutor) LaunchTask(driver exec.ExecutorDriver, taskInfo *mesos.TaskInfo) { + fmt.Println("Launching task", taskInfo.GetName(), "with command", taskInfo.Command.GetValue()) + + runStatus := &mesos.TaskStatus{ + TaskId: taskInfo.GetTaskId(), + State: mesos.TaskState_TASK_RUNNING.Enum(), + } + _, err := driver.SendStatusUpdate(runStatus) + if err != nil { + fmt.Println("Got error", err) + } + + exec.tasksLaunched++ + fmt.Println("Total tasks launched ", exec.tasksLaunched) + // + // this is where one would perform the requested task + // + + // finish task + fmt.Println("Finishing task", taskInfo.GetName()) + finStatus := &mesos.TaskStatus{ + TaskId: taskInfo.GetTaskId(), + State: mesos.TaskState_TASK_FINISHED.Enum(), + } + _, err = driver.SendStatusUpdate(finStatus) + if err != nil { + fmt.Println("Got error", err) + } + fmt.Println("Task finished", taskInfo.GetName()) +} + +func (exec *exampleExecutor) KillTask(exec.ExecutorDriver, *mesos.TaskID) { + fmt.Println("Kill task") +} + +func (exec *exampleExecutor) FrameworkMessage(driver exec.ExecutorDriver, msg string) { + fmt.Println("Got framework message: ", msg) +} + +func (exec *exampleExecutor) Shutdown(exec.ExecutorDriver) { + fmt.Println("Shutting down the executor") +} + +func (exec *exampleExecutor) Error(driver exec.ExecutorDriver, err string) { + fmt.Println("Got error message:", err) +} + +// -------------------------- func inits () ----------------- // +func init() { + flag.Parse() +} + +func main() { + fmt.Println("Starting Example Executor (Go)") + + dconfig := exec.DriverConfig{ + Executor: newExampleExecutor(), + } + driver, err := exec.NewMesosExecutorDriver(dconfig) + + if err != nil { + fmt.Println("Unable to create a ExecutorDriver ", err.Error()) + } + + _, err = driver.Start() + if err != nil { + fmt.Println("Got error:", err) + return + } + fmt.Println("Executor process has started and running.") + driver.Join() +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example_scheduler.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example_scheduler.go new file mode 100644 index 00000000000..5db1dcd3d9d --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/examples/example_scheduler.go @@ -0,0 +1,294 @@ +// +build example-sched + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "net" + "net/http" + "strconv" + "strings" + + "github.com/gogo/protobuf/proto" + log "github.com/golang/glog" + "github.com/mesos/mesos-go/auth" + "github.com/mesos/mesos-go/auth/sasl" + "github.com/mesos/mesos-go/auth/sasl/mech" + mesos "github.com/mesos/mesos-go/mesosproto" + util "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "golang.org/x/net/context" +) + +const ( + CPUS_PER_TASK = 1 + MEM_PER_TASK = 128 + defaultArtifactPort = 12345 +) + +var ( + address = flag.String("address", "127.0.0.1", "Binding address for artifact server") + artifactPort = flag.Int("artifactPort", defaultArtifactPort, "Binding port for artifact server") + authProvider = flag.String("mesos_authentication_provider", sasl.ProviderName, + fmt.Sprintf("Authentication provider to use, default is SASL that supports mechanisms: %+v", mech.ListSupported())) + master = flag.String("master", "127.0.0.1:5050", "Master address ") + executorPath = flag.String("executor", "./example_executor", "Path to test executor") + taskCount = flag.String("task-count", "5", "Total task count to run.") + mesosAuthPrincipal = flag.String("mesos_authentication_principal", "", "Mesos authentication principal.") + mesosAuthSecretFile = flag.String("mesos_authentication_secret_file", "", "Mesos authentication secret file.") +) + +type ExampleScheduler struct { + executor *mesos.ExecutorInfo + tasksLaunched int + tasksFinished int + totalTasks int +} + +func newExampleScheduler(exec *mesos.ExecutorInfo) *ExampleScheduler { + total, err := strconv.Atoi(*taskCount) + if err != nil { + total = 5 + } + return &ExampleScheduler{ + executor: exec, + tasksLaunched: 0, + tasksFinished: 0, + totalTasks: total, + } +} + +func (sched *ExampleScheduler) Registered(driver sched.SchedulerDriver, frameworkId *mesos.FrameworkID, masterInfo *mesos.MasterInfo) { + log.Infoln("Framework Registered with Master ", masterInfo) +} + +func (sched *ExampleScheduler) Reregistered(driver sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + log.Infoln("Framework Re-Registered with Master ", masterInfo) +} + +func (sched *ExampleScheduler) Disconnected(sched.SchedulerDriver) {} + +func (sched *ExampleScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + + for _, offer := range offers { + cpuResources := util.FilterResources(offer.Resources, func(res *mesos.Resource) bool { + return res.GetName() == "cpus" + }) + cpus := 0.0 + for _, res := range cpuResources { + cpus += res.GetScalar().GetValue() + } + + memResources := util.FilterResources(offer.Resources, func(res *mesos.Resource) bool { + return res.GetName() == "mem" + }) + mems := 0.0 + for _, res := range memResources { + mems += res.GetScalar().GetValue() + } + + log.Infoln("Received Offer <", offer.Id.GetValue(), "> with cpus=", cpus, " mem=", mems) + + remainingCpus := cpus + remainingMems := mems + + var tasks []*mesos.TaskInfo + for sched.tasksLaunched < sched.totalTasks && + CPUS_PER_TASK <= remainingCpus && + MEM_PER_TASK <= remainingMems { + + sched.tasksLaunched++ + + taskId := &mesos.TaskID{ + Value: proto.String(strconv.Itoa(sched.tasksLaunched)), + } + + task := &mesos.TaskInfo{ + Name: proto.String("go-task-" + taskId.GetValue()), + TaskId: taskId, + SlaveId: offer.SlaveId, + Executor: sched.executor, + Resources: []*mesos.Resource{ + util.NewScalarResource("cpus", CPUS_PER_TASK), + util.NewScalarResource("mem", MEM_PER_TASK), + }, + } + log.Infof("Prepared task: %s with offer %s for launch\n", task.GetName(), offer.Id.GetValue()) + + tasks = append(tasks, task) + remainingCpus -= CPUS_PER_TASK + remainingMems -= MEM_PER_TASK + } + log.Infoln("Launching ", len(tasks), "tasks for offer", offer.Id.GetValue()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, &mesos.Filters{RefuseSeconds: proto.Float64(1)}) + } +} + +func (sched *ExampleScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Infoln("Status update: task", status.TaskId.GetValue(), " is in state ", status.State.Enum().String()) + if status.GetState() == mesos.TaskState_TASK_FINISHED { + sched.tasksFinished++ + } + + if sched.tasksFinished >= sched.totalTasks { + log.Infoln("Total tasks completed, stopping framework.") + driver.Stop(false) + } + + if status.GetState() == mesos.TaskState_TASK_LOST || + status.GetState() == mesos.TaskState_TASK_KILLED || + status.GetState() == mesos.TaskState_TASK_FAILED { + log.Infoln( + "Aborting because task", status.TaskId.GetValue(), + "is in unexpected state", status.State.String(), + "with message", status.GetMessage(), + ) + driver.Abort() + } +} + +func (sched *ExampleScheduler) OfferRescinded(sched.SchedulerDriver, *mesos.OfferID) {} + +func (sched *ExampleScheduler) FrameworkMessage(sched.SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, string) { +} +func (sched *ExampleScheduler) SlaveLost(sched.SchedulerDriver, *mesos.SlaveID) {} +func (sched *ExampleScheduler) ExecutorLost(sched.SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, int) { +} + +func (sched *ExampleScheduler) Error(driver sched.SchedulerDriver, err string) { + log.Infoln("Scheduler received error:", err) +} + +// ----------------------- func init() ------------------------- // + +func init() { + flag.Parse() + log.Infoln("Initializing the Example Scheduler...") +} + +// returns (downloadURI, basename(path)) +func serveExecutorArtifact(path string) (*string, string) { + serveFile := func(pattern string, filename string) { + http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, filename) + }) + } + + // Create base path (http://foobar:5000/) + pathSplit := strings.Split(path, "/") + var base string + if len(pathSplit) > 0 { + base = pathSplit[len(pathSplit)-1] + } else { + base = path + } + serveFile("/"+base, path) + + hostURI := fmt.Sprintf("http://%s:%d/%s", *address, *artifactPort, base) + log.V(2).Infof("Hosting artifact '%s' at '%s'", path, hostURI) + + return &hostURI, base +} + +func prepareExecutorInfo() *mesos.ExecutorInfo { + executorUris := []*mesos.CommandInfo_URI{} + uri, executorCmd := serveExecutorArtifact(*executorPath) + executorUris = append(executorUris, &mesos.CommandInfo_URI{Value: uri, Executable: proto.Bool(true)}) + + executorCommand := fmt.Sprintf("./%s", executorCmd) + + go http.ListenAndServe(fmt.Sprintf("%s:%d", *address, *artifactPort), nil) + log.V(2).Info("Serving executor artifacts...") + + // Create mesos scheduler driver. + return &mesos.ExecutorInfo{ + ExecutorId: util.NewExecutorID("default"), + Name: proto.String("Test Executor (Go)"), + Source: proto.String("go_test"), + Command: &mesos.CommandInfo{ + Value: proto.String(executorCommand), + Uris: executorUris, + }, + } +} + +func parseIP(address string) net.IP { + addr, err := net.LookupIP(address) + if err != nil { + log.Fatal(err) + } + if len(addr) < 1 { + log.Fatalf("failed to parse IP from address '%v'", address) + } + return addr[0] +} + +// ----------------------- func main() ------------------------- // + +func main() { + + // build command executor + exec := prepareExecutorInfo() + + // the framework + fwinfo := &mesos.FrameworkInfo{ + User: proto.String(""), // Mesos-go will fill in user. + Name: proto.String("Test Framework (Go)"), + } + + cred := (*mesos.Credential)(nil) + if *mesosAuthPrincipal != "" { + fwinfo.Principal = proto.String(*mesosAuthPrincipal) + secret, err := ioutil.ReadFile(*mesosAuthSecretFile) + if err != nil { + log.Fatal(err) + } + cred = &mesos.Credential{ + Principal: proto.String(*mesosAuthPrincipal), + Secret: secret, + } + } + bindingAddress := parseIP(*address) + config := sched.DriverConfig{ + Scheduler: newExampleScheduler(exec), + Framework: fwinfo, + Master: *master, + Credential: cred, + BindingAddress: bindingAddress, + WithAuthContext: func(ctx context.Context) context.Context { + ctx = auth.WithLoginProvider(ctx, *authProvider) + ctx = sasl.WithBindingAddress(ctx, bindingAddress) + return ctx + }, + } + driver, err := sched.NewMesosSchedulerDriver(config) + + if err != nil { + log.Errorln("Unable to create a SchedulerDriver ", err.Error()) + } + + if stat, err := driver.Run(); err != nil { + log.Infof("Framework stopped with status %s and error: %s\n", stat.String(), err.Error()) + } + +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor.go index 05ed98581ee..2f6352c7f54 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor.go @@ -185,6 +185,8 @@ func (driver *MesosExecutorDriver) setStatus(stat mesosproto.Status) { } func (driver *MesosExecutorDriver) Stopped() bool { + driver.lock.RLock() + defer driver.lock.RUnlock() return driver.stopped } @@ -195,6 +197,8 @@ func (driver *MesosExecutorDriver) setStopped(val bool) { } func (driver *MesosExecutorDriver) Connected() bool { + driver.lock.RLock() + defer driver.lock.RUnlock() return driver.connected } @@ -338,6 +342,8 @@ func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(from *upid.UPID, taskID := msg.GetTaskId() uuid := uuid.UUID(msg.GetUuid()) + driver.lock.Lock() + defer driver.lock.Unlock() if driver.stopped { log.Infof("Ignoring status update acknowledgement %v for task %v of framework %v because the driver is stopped!\n", uuid, taskID, frameworkID) @@ -526,7 +532,9 @@ func (driver *MesosExecutorDriver) SendStatusUpdate(taskStatus *mesosproto.TaskS log.Infof("Executor sending status update %v\n", update.String()) // Capture the status update. + driver.lock.Lock() driver.updates[uuid.UUID(update.GetUuid()).String()] = update + driver.lock.Unlock() // Put the status update in the message. message := &mesosproto.StatusUpdateMessage{ diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor_intgr_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor_intgr_test.go index 38b72731872..86199a94102 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor_intgr_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor_intgr_test.go @@ -193,7 +193,7 @@ func TestExecutorDriverExecutorRegisteredEvent(t *testing.T) { } c := testutil.NewMockMesosClient(t, server.PID) c.SendMessage(driver.self, pbMsg) - assert.True(t, driver.connected) + assert.True(t, driver.Connected()) select { case <-ch: case <-time.After(time.Millisecond * 2): diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor_test.go index a2894b2c299..067e21cebfe 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/executor/executor_test.go @@ -98,8 +98,10 @@ func TestExecutorDriverStartFailedToParseEnvironment(t *testing.T) { clearEnvironments(t) exec := NewMockedExecutor() exec.On("Error").Return(nil) - driver := newTestExecutorDriver(t, exec) + dconfig := DriverConfig{Executor: exec} + driver, err := NewMesosExecutorDriver(dconfig) assert.Nil(t, driver) + assert.Error(t, err) } func TestExecutorDriverStartFailedToStartMessenger(t *testing.T) { @@ -206,7 +208,7 @@ func TestExecutorDriverRun(t *testing.T) { assert.Equal(t, mesosproto.Status_DRIVER_STOPPED, stat) }() time.Sleep(time.Millisecond * 1) // allow for things to settle - assert.False(t, driver.stopped) + assert.False(t, driver.Stopped()) assert.Equal(t, mesosproto.Status_DRIVER_RUNNING, driver.Status()) // mannually close it all @@ -394,3 +396,19 @@ func TestExecutorDriverSendFrameworkMessage(t *testing.T) { assert.NoError(t, err) assert.Equal(t, mesosproto.Status_DRIVER_RUNNING, stat) } + +func TestStatusUpdateAckRace_Issue103(t *testing.T) { + driver, _, _ := createTestExecutorDriver(t) + _, err := driver.Start() + assert.NoError(t, err) + + msg := &mesosproto.StatusUpdateAcknowledgementMessage{} + go driver.statusUpdateAcknowledgement(nil, msg) + + taskStatus := util.NewTaskStatus( + util.NewTaskID("test-task-001"), + mesosproto.TaskState_TASK_STAGING, + ) + + driver.SendStatusUpdate(taskStatus) +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/health_checker.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/health_checker.go new file mode 100644 index 00000000000..a87ce133d59 --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/health_checker.go @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthchecker + +import ( + "time" + + "github.com/mesos/mesos-go/upid" +) + +// HealthChecker defines the interface of a health checker. +type HealthChecker interface { + // Start will start the health checker, and returns a notification channel. + // if the checker thinks the slave is unhealthy, it will send the timestamp + // via the channel. + Start() <-chan time.Time + // Pause will pause the slave health checker. + Pause() + // Continue will continue the slave health checker with a new slave upid. + Continue(slaveUPID *upid.UPID) + // Stop will stop the health checker. it should be called only once during + // the life span of the checker. + Stop() +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/mocked_slave_health_checker.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/mocked_slave_health_checker.go new file mode 100644 index 00000000000..0bee139a530 --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/mocked_slave_health_checker.go @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthchecker + +import ( + "time" + + "github.com/mesos/mesos-go/upid" + "github.com/stretchr/testify/mock" +) + +type MockedHealthChecker struct { + mock.Mock + ch chan time.Time +} + +// NewMockedHealthChecker returns a new mocked health checker. +func NewMockedHealthChecker() *MockedHealthChecker { + return &MockedHealthChecker{ch: make(chan time.Time, 1)} +} + +// Start will start the checker and returns the notification channel. +func (m *MockedHealthChecker) Start() <-chan time.Time { + m.Called() + return m.ch +} + +// Pause will pause the slave health checker. +func (m *MockedHealthChecker) Pause() { + m.Called() +} + +// Continue will continue the slave health checker with a new slave upid. +func (m *MockedHealthChecker) Continue(slaveUPID *upid.UPID) { + m.Called() +} + +// Stop will stop the checker. +func (m *MockedHealthChecker) Stop() { + m.Called() +} + +func (m *MockedHealthChecker) TriggerUnhealthyEvent() { + m.ch <- time.Now() +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/slave_health_checker.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/slave_health_checker.go new file mode 100644 index 00000000000..3627b59ddec --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/slave_health_checker.go @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthchecker + +import ( + "fmt" + "net/http" + "sync" + "time" + + log "github.com/golang/glog" + "github.com/mesos/mesos-go/upid" +) + +const ( + defaultTimeout = time.Second + defaultCheckDuration = time.Second + defaultThreshold = 5 +) + +// SlaveHealthChecker is for checking the slave's health. +type SlaveHealthChecker struct { + sync.RWMutex + slaveUPID *upid.UPID + client *http.Client + threshold int + checkDuration time.Duration + continuousUnhealthyCount int + stop chan struct{} + ch chan time.Time + paused bool +} + +// NewSlaveHealthChecker creates a slave health checker and return a notification channel. +// Each time the checker thinks the slave is unhealthy, it will send a notification through the channel. +func NewSlaveHealthChecker(slaveUPID *upid.UPID, threshold int, checkDuration time.Duration, timeout time.Duration) *SlaveHealthChecker { + checker := &SlaveHealthChecker{ + slaveUPID: slaveUPID, + client: &http.Client{Timeout: timeout}, + threshold: threshold, + checkDuration: checkDuration, + stop: make(chan struct{}), + ch: make(chan time.Time, 1), + } + if timeout == 0 { + checker.client.Timeout = defaultTimeout + } + if checkDuration == 0 { + checker.checkDuration = defaultCheckDuration + } + if threshold <= 0 { + checker.threshold = defaultThreshold + } + return checker +} + +// Start will start the health checker and returns the notification channel. +func (s *SlaveHealthChecker) Start() <-chan time.Time { + go func() { + ticker := time.Tick(s.checkDuration) + for { + select { + case <-ticker: + s.RLock() + if !s.paused { + s.doCheck() + } + s.RUnlock() + case <-s.stop: + return + } + } + }() + return s.ch +} + +// Pause will pause the slave health checker. +func (s *SlaveHealthChecker) Pause() { + s.Lock() + defer s.Unlock() + s.paused = true +} + +// Continue will continue the slave health checker with a new slave upid. +func (s *SlaveHealthChecker) Continue(slaveUPID *upid.UPID) { + s.Lock() + defer s.Unlock() + s.paused = false + s.slaveUPID = slaveUPID +} + +// Stop will stop the slave health checker. +// It should be called only once during the life span of the checker. +func (s *SlaveHealthChecker) Stop() { + close(s.stop) +} + +func (s *SlaveHealthChecker) doCheck() { + path := fmt.Sprintf("http://%s:%s/%s/health", s.slaveUPID.Host, s.slaveUPID.Port, s.slaveUPID.ID) + resp, err := s.client.Head(path) + unhealthy := false + if err != nil { + log.Errorf("Failed to request the health path: %v\n", err) + unhealthy = true + } else if resp.StatusCode != http.StatusOK { + log.Errorf("Failed to request the health path: status: %v\n", resp.StatusCode) + unhealthy = true + } + if unhealthy { + s.continuousUnhealthyCount++ + if s.continuousUnhealthyCount >= s.threshold { + select { + case s.ch <- time.Now(): // If no one is receiving the channel, then just skip it. + default: + } + s.continuousUnhealthyCount = 0 + } + return + } + s.continuousUnhealthyCount = 0 + resp.Body.Close() +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/slave_health_checker_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/slave_health_checker_test.go new file mode 100644 index 00000000000..07cdb7cecbd --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/healthchecker/slave_health_checker_test.go @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthchecker + +import ( + "fmt" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/mesos/mesos-go/upid" + "github.com/stretchr/testify/assert" +) + +type thresholdMonitor struct { + cnt int32 + threshold int32 +} + +func newThresholdMonitor(threshold int) *thresholdMonitor { + return &thresholdMonitor{threshold: int32(threshold)} +} + +// incAndTest returns true if the threshold is reached. +func (t *thresholdMonitor) incAndTest() bool { + if atomic.AddInt32(&t.cnt, 1) >= t.threshold { + return false + } + return true +} + +// blockedServer replies only threshold times, after that +// it will block. +type blockedServer struct { + th *thresholdMonitor + ch chan struct{} +} + +func newBlockedServer(threshold int) *blockedServer { + return &blockedServer{ + th: newThresholdMonitor(threshold), + ch: make(chan struct{}), + } +} + +func (s *blockedServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if s.th.incAndTest() { + return + } + <-s.ch +} + +func (s *blockedServer) stop() { + close(s.ch) +} + +// eofServer will close the connection after it replies for threshold times. +// Thus the health checker will get an EOF error. +type eofServer struct { + th *thresholdMonitor +} + +func newEOFServer(threshold int) *eofServer { + return &eofServer{newThresholdMonitor(threshold)} +} + +func (s *eofServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if s.th.incAndTest() { + return + } + hj := w.(http.Hijacker) + conn, _, err := hj.Hijack() + if err != nil { + panic("Cannot hijack") + } + conn.Close() +} + +// errorStatusCodeServer will reply error status code (e.g. 503) after the +// it replies for threhold time. +type errorStatusCodeServer struct { + th *thresholdMonitor +} + +func newErrorStatusServer(threshold int) *errorStatusCodeServer { + return &errorStatusCodeServer{newThresholdMonitor(threshold)} +} + +func (s *errorStatusCodeServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if s.th.incAndTest() { + return + } + w.WriteHeader(http.StatusServiceUnavailable) +} + +// goodServer always returns status ok. +type goodServer bool + +func (s *goodServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {} + +// partitionedServer returns status ok at some first requests. +// Then it will block for a while, and then reply again. +type partitionedServer struct { + healthyCnt int32 + partitionCnt int32 + cnt int32 + mutex *sync.Mutex + cond *sync.Cond +} + +func newPartitionedServer(healthyCnt, partitionCnt int) *partitionedServer { + mutex := new(sync.Mutex) + cond := sync.NewCond(mutex) + return &partitionedServer{ + healthyCnt: int32(healthyCnt), + partitionCnt: int32(partitionCnt), + mutex: mutex, + cond: cond, + } +} + +func (s *partitionedServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + cnt := atomic.AddInt32(&s.cnt, 1) + if cnt < s.healthyCnt { + return + } + if cnt < s.healthyCnt+s.partitionCnt { + s.mutex.Lock() + defer s.mutex.Unlock() + s.cond.Wait() + return + } + s.mutex.Lock() + defer s.mutex.Unlock() + s.cond.Broadcast() +} + +func TestSlaveHealthCheckerFailedOnBlockedSlave(t *testing.T) { + s := newBlockedServer(5) + ts := httptest.NewUnstartedServer(s) + ts.Start() + defer ts.Close() + + upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String())) + assert.NoError(t, err) + + checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10) + ch := checker.Start() + defer checker.Stop() + + select { + case <-time.After(time.Second): + s.stop() + t.Fatal("timeout") + case <-ch: + s.stop() + assert.True(t, atomic.LoadInt32(&s.th.cnt) > 10) + } +} + +func TestSlaveHealthCheckerFailedOnEOFSlave(t *testing.T) { + s := newEOFServer(5) + ts := httptest.NewUnstartedServer(s) + ts.Start() + defer ts.Close() + + upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String())) + assert.NoError(t, err) + + checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10) + ch := checker.Start() + defer checker.Stop() + + select { + case <-time.After(time.Second): + t.Fatal("timeout") + case <-ch: + assert.True(t, atomic.LoadInt32(&s.th.cnt) > 10) + } +} + +func TestSlaveHealthCheckerFailedOnErrorStatusSlave(t *testing.T) { + s := newErrorStatusServer(5) + ts := httptest.NewUnstartedServer(s) + ts.Start() + defer ts.Close() + + upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String())) + assert.NoError(t, err) + + checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10) + ch := checker.Start() + defer checker.Stop() + + select { + case <-time.After(time.Second): + t.Fatal("timeout") + case <-ch: + assert.True(t, atomic.LoadInt32(&s.th.cnt) > 10) + } +} + +func TestSlaveHealthCheckerSucceed(t *testing.T) { + s := new(goodServer) + ts := httptest.NewUnstartedServer(s) + ts.Start() + defer ts.Close() + + upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String())) + assert.NoError(t, err) + + checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10) + ch := checker.Start() + defer checker.Stop() + + select { + case <-time.After(time.Second): + assert.Equal(t, 0, checker.continuousUnhealthyCount) + case <-ch: + t.Fatal("Shouldn't get unhealthy notification") + } +} + +func TestSlaveHealthCheckerPartitonedSlave(t *testing.T) { + s := newPartitionedServer(5, 9) + ts := httptest.NewUnstartedServer(s) + ts.Start() + defer ts.Close() + + upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String())) + assert.NoError(t, err) + + checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10) + ch := checker.Start() + defer checker.Stop() + + select { + case <-time.After(time.Second): + assert.Equal(t, 0, checker.continuousUnhealthyCount) + case <-ch: + t.Fatal("Shouldn't get unhealthy notification") + } +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/mesos/doc.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/mesos/doc.go new file mode 100644 index 00000000000..6c0973f34c2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/mesos/doc.go @@ -0,0 +1,3 @@ +// This package was previously the home of the native bindings. Please use the +// native branch if you need to build against the native bindings. +package mesos diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/README.md b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/README.md index da0673e78a0..d3535253d31 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/README.md +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/README.md @@ -1,7 +1,7 @@ ####Benchmark of the messenger. ```shell -$ go test -v -run=Benckmark* -bench=. +$ go test -v -run=Benckmark* -bench=. PASS BenchmarkMessengerSendSmallMessage 50000 70568 ns/op BenchmarkMessengerSendMediumMessage 50000 70265 ns/op @@ -29,7 +29,7 @@ BenchmarkMessengerSendRecvLargeMessage-4 50000 45472 ns/op BenchmarkMessengerSendRecvMixedMessage-4 50000 47393 ns/op ok github.com/mesos/mesos-go/messenger 105.173s ``` - + ####environment: ``` diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go index 30370b04835..cfd7f2583a9 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go @@ -21,7 +21,6 @@ package messenger import ( "bytes" "fmt" - "github.com/mesos/mesos-go/upid" "io/ioutil" "net" "net/http" @@ -33,6 +32,7 @@ import ( "time" log "github.com/golang/glog" + "github.com/mesos/mesos-go/upid" "golang.org/x/net/context" ) @@ -235,7 +235,14 @@ func (t *HTTPTransporter) listen() error { } else { host = t.upid.Host } - port := t.upid.Port + + var port string + if t.upid.Port != "" { + port = t.upid.Port + } else { + port = "0" + } + // NOTE: Explicitly specifies IPv4 because Libprocess // only supports IPv4 for now. ln, err := net.Listen("tcp4", net.JoinHostPort(host, port)) @@ -245,7 +252,15 @@ func (t *HTTPTransporter) listen() error { } // Save the host:port in case they are not specified in upid. host, port, _ = net.SplitHostPort(ln.Addr().String()) - t.upid.Host, t.upid.Port = host, port + + if len(t.upid.Host) == 0 { + t.upid.Host = host + } + + if len(t.upid.Port) == 0 || t.upid.Port == "0" { + t.upid.Port = port + } + t.listener = ln return nil } diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter_test.go index e1d14096526..f3dc1e56bf4 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter_test.go @@ -2,6 +2,7 @@ package messenger import ( "fmt" + "net" "net/http" "net/http/httptest" "regexp" @@ -266,6 +267,66 @@ func TestTransporterStartAndStop(t *testing.T) { } } +func TestMutatedHostUPid(t *testing.T) { + serverId := "testserver" + serverPort := getNewPort() + serverHost := "127.0.0.1" + serverAddr := serverHost + ":" + strconv.Itoa(serverPort) + + // override the upid.Host with this listener IP + addr := net.ParseIP("127.0.0.2") + + // setup receiver (server) process + uPid, err := upid.Parse(fmt.Sprintf("%s@%s", serverId, serverAddr)) + assert.NoError(t, err) + receiver := NewHTTPTransporter(uPid, addr) + + err = receiver.listen() + assert.NoError(t, err) + + if receiver.upid.Host != "127.0.0.1" { + t.Fatalf("reciever.upid.Host was expected to return %s, got %s\n", serverHost, receiver.upid.Host) + } + + if receiver.upid.Port != strconv.Itoa(serverPort) { + t.Fatalf("receiver.upid.Port was expected to return %d, got %s\n", serverPort, receiver.upid.Port) + } +} + +func TestEmptyHostPortUPid(t *testing.T) { + serverId := "testserver" + serverPort := getNewPort() + serverHost := "127.0.0.1" + serverAddr := serverHost + ":" + strconv.Itoa(serverPort) + + // setup receiver (server) process + uPid, err := upid.Parse(fmt.Sprintf("%s@%s", serverId, serverAddr)) + assert.NoError(t, err) + + // Unset upid host and port + uPid.Host = "" + uPid.Port = "" + + // override the upid.Host with this listener IP + addr := net.ParseIP("127.0.0.2") + + receiver := NewHTTPTransporter(uPid, addr) + + err = receiver.listen() + assert.NoError(t, err) + + // This should be the host that overrides as uPid.Host is empty + if receiver.upid.Host != "127.0.0.2" { + t.Fatalf("reciever.upid.Host was expected to return %s, got %s\n", serverHost, receiver.upid.Host) + } + + // This should end up being a random port, not the server port as uPid + // port is empty + if receiver.upid.Port == strconv.Itoa(serverPort) { + t.Fatalf("receiver.upid.Port was not expected to return %d, got %s\n", serverPort, receiver.upid.Port) + } +} + func makeMockServer(path string, handler func(rsp http.ResponseWriter, req *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(path, handler) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go index 5b242e5bce3..c81ad31853c 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go @@ -76,41 +76,62 @@ type MesosMessenger struct { tr Transporter } -// create a new default messenger (HTTP). If a non-nil, non-wildcard bindingAddress is -// specified then it will be used for both the UPID and Transport binding address. Otherwise -// hostname is resolved to an IP address and the UPID.Host is set to that address and the -// bindingAddress is passed through to the Transport. +// ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to +// determine the binding-address used for both the UPID.Host and Transport binding address. func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16) (Messenger, error) { upid := &upid.UPID{ ID: proc.Label(), Port: strconv.Itoa(int(port)), } + host, err := UPIDBindingAddress(hostname, bindingAddress) + if err != nil { + return nil, err + } + upid.Host = host + return NewHttpWithBindingAddress(upid, bindingAddress), nil +} + +// UPIDBindingAddress determines the value of UPID.Host that will be used to build +// a Transport. If a non-nil, non-wildcard bindingAddress is specified then it will be used +// for both the UPID and Transport binding address. Otherwise hostname is resolved to an IP +// address and the UPID.Host is set to that address and the bindingAddress is passed through +// to the Transport. +func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error) { + upidHost := "" if bindingAddress != nil && "0.0.0.0" != bindingAddress.String() { - upid.Host = bindingAddress.String() + upidHost = bindingAddress.String() } else { - ips, err := net.LookupIP(hostname) - if err != nil { - return nil, err + if hostname == "" || hostname == "0.0.0.0" { + return "", fmt.Errorf("invalid hostname (%q) specified with binding address %v", hostname, bindingAddress) } - // try to find an ipv4 and use that - ip := net.IP(nil) - for _, addr := range ips { - if ip = addr.To4(); ip != nil { - break - } + ip := net.ParseIP(hostname) + if ip != nil { + ip = ip.To4() } if ip == nil { - // no ipv4? best guess, just take the first addr - if len(ips) > 0 { - ip = ips[0] - log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip) - } else { - return nil, fmt.Errorf("failed to determine IP address for host '%v'", hostname) + ips, err := net.LookupIP(hostname) + if err != nil { + return "", err + } + // try to find an ipv4 and use that + for _, addr := range ips { + if ip = addr.To4(); ip != nil { + break + } + } + if ip == nil { + // no ipv4? best guess, just take the first addr + if len(ips) > 0 { + ip = ips[0] + log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip) + } else { + return "", fmt.Errorf("failed to determine IP address for host '%v'", hostname) + } } } - upid.Host = ip.String() + upidHost = ip.String() } - return NewHttpWithBindingAddress(upid, bindingAddress), nil + return upidHost, nil } // NewMesosMessenger creates a new mesos messenger. diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger_test.go index 096f201116c..4a189537004 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger_test.go @@ -3,6 +3,7 @@ package messenger import ( "fmt" "math/rand" + "net" "net/http" "net/http/httptest" "strconv" @@ -431,3 +432,35 @@ func BenchmarkMessengerSendRecvMixedMessage(b *testing.B) { } globalWG.Wait() } + +func TestUPIDBindingAddress(t *testing.T) { + tt := []struct { + hostname string + binding net.IP + expected string + }{ + {"", nil, ""}, + {"", net.IPv4(1, 2, 3, 4), "1.2.3.4"}, + {"", net.IPv4(0, 0, 0, 0), ""}, + {"localhost", nil, "127.0.0.1"}, + {"localhost", net.IPv4(5, 6, 7, 8), "5.6.7.8"}, + {"localhost", net.IPv4(0, 0, 0, 0), "127.0.0.1"}, + {"0.0.0.0", nil, ""}, + {"7.8.9.1", nil, "7.8.9.1"}, + {"7.8.9.1", net.IPv4(0, 0, 0, 0), "7.8.9.1"}, + {"7.8.9.1", net.IPv4(8, 9, 1, 2), "8.9.1.2"}, + } + + for i, tc := range tt { + actual, err := UPIDBindingAddress(tc.hostname, tc.binding) + if err != nil && tc.expected != "" { + t.Fatalf("test case %d failed; expected %q instead of error %v", i+1, tc.expected, err) + } + if err == nil && actual != tc.expected { + t.Fatalf("test case %d failed; expected %q instead of %q", i+1, tc.expected, actual) + } + if err != nil { + t.Logf("test case %d; received expected error %v", i+1, err) + } + } +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go index 439e9977f87..90204994af5 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go @@ -478,7 +478,7 @@ func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg } func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg proto.Message) { - log.V(1).Infoln("Handling resource offers.") + log.V(2).Infoln("Handling resource offers.") msg := pbMsg.(*mesos.ResourceOffersMessage) if driver.Status() == mesos.Status_DRIVER_ABORTED { @@ -500,7 +500,7 @@ func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg prot for i, offer := range msg.Offers { if pid, err := upid.Parse(pidStrings[i]); err == nil { driver.cache.putOffer(offer, pid) - log.V(1).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid) + log.V(2).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid) } else { log.Warningf("Failed to parse offer PID '%v': %v", pid, err) } @@ -822,11 +822,15 @@ func (driver *MesosSchedulerDriver) Stop(failover bool) (mesos.Status, error) { return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat) } - if driver.connected && failover { + if driver.connected && !failover { // unregister the framework + log.Infoln("Unregistering the scheduler driver") message := &mesos.UnregisterFrameworkMessage{ FrameworkId: driver.FrameworkInfo.Id, } + //TODO(jdef) this is actually a little racy: we send an 'unregister' message but then + // immediately afterward the messenger is stopped in driver.stop(). so the unregister message + // may not actually end up being sent out. if err := driver.send(driver.MasterPid, message); err != nil { log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err) return driver.stop(mesos.Status_DRIVER_ABORTED) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_unit_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_unit_test.go index 423459f6a8a..add643262fa 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_unit_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_unit_test.go @@ -20,7 +20,6 @@ package scheduler import ( "fmt" - "os" "os/user" "sync" "testing" @@ -38,6 +37,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "golang.org/x/net/context" ) var ( @@ -137,7 +137,7 @@ func TestSchedulerDriverNew(t *testing.T) { driver := newTestSchedulerDriver(t, NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil) user, _ := user.Current() assert.Equal(t, user.Username, driver.FrameworkInfo.GetUser()) - host, _ := os.Hostname() + host := util.GetHostname("") assert.Equal(t, host, driver.FrameworkInfo.GetHostname()) } @@ -335,9 +335,19 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStopUnstarted() { suite.Equal(mesos.Status_DRIVER_NOT_STARTED, stat) } -func (suite *SchedulerTestSuite) TestSchdulerDriverStopOK() { +type msgTracker struct { + *messenger.MockedMessenger + lastMessage proto.Message +} + +func (m *msgTracker) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error { + m.lastMessage = msg + return m.MockedMessenger.Send(ctx, upid, msg) +} + +func (suite *SchedulerTestSuite) TestSchdulerDriverStop_WithoutFailover() { // Set expections and return values. - messenger := messenger.NewMockedMessenger() + messenger := &msgTracker{MockedMessenger: messenger.NewMockedMessenger()} messenger.On("Start").Return(nil) messenger.On("UPID").Return(&upid.UPID{}) messenger.On("Send").Return(nil) @@ -357,9 +367,54 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverStopOK() { suite.False(driver.Stopped()) suite.Equal(mesos.Status_DRIVER_RUNNING, driver.Status()) + driver.connected = true // pretend that we're already registered driver.Stop(false) - time.Sleep(time.Millisecond * 1) + + msg := messenger.lastMessage + suite.NotNil(msg) + _, isUnregMsg := msg.(proto.Message) + suite.True(isUnregMsg, "expected UnregisterFrameworkMessage instead of %+v", msg) + + suite.True(driver.Stopped()) + suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status()) +} + +func (suite *SchedulerTestSuite) TestSchdulerDriverStop_WithFailover() { + // Set expections and return values. + messenger := &msgTracker{MockedMessenger: messenger.NewMockedMessenger()} + messenger.On("Start").Return(nil) + messenger.On("UPID").Return(&upid.UPID{}) + messenger.On("Send").Return(nil) + messenger.On("Stop").Return(nil) + messenger.On("Route").Return(nil) + + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) + driver.messenger = messenger + suite.True(driver.Stopped()) + + stat, err := driver.Start() + suite.NoError(err) + suite.Equal(mesos.Status_DRIVER_RUNNING, stat) + suite.False(driver.Stopped()) + driver.connected = true // pretend that we're already registered + + go func() { + // Run() blocks until the driver is stopped or aborted + stat, err := driver.Join() + suite.NoError(err) + suite.Equal(mesos.Status_DRIVER_STOPPED, stat) + }() + + // wait for Join() to begin blocking (so that it has already validated the driver state) + time.Sleep(200 * time.Millisecond) + + driver.Stop(true) // true = scheduler failover + msg := messenger.lastMessage + + // we're expecting that lastMessage is nil because when failing over there's no + // 'unregister' message sent by the scheduler. + suite.Nil(msg) suite.True(driver.Stopped()) suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status()) @@ -410,7 +465,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLunchTasksUnstarted() { suite.True(driver.Stopped()) stat, err := driver.LaunchTasks( - []*mesos.OfferID{&mesos.OfferID{}}, + []*mesos.OfferID{{}}, []*mesos.TaskInfo{}, &mesos.Filters{}, ) @@ -514,7 +569,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLaunchTasks() { tasks := []*mesos.TaskInfo{task} stat, err := driver.LaunchTasks( - []*mesos.OfferID{&mesos.OfferID{}}, + []*mesos.OfferID{{}}, tasks, &mesos.Filters{}, ) @@ -565,7 +620,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverRequestResources() { stat, err := driver.RequestResources( []*mesos.Request{ - &mesos.Request{ + { SlaveId: util.NewSlaveID("test-slave-001"), Resources: []*mesos.Resource{ util.NewScalarResource("test-res-001", 33.00), diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/testutil/testingutil.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/testutil/testingutil.go new file mode 100644 index 00000000000..5ec1d58ca12 --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/testutil/testingutil.go @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//Collection of resources for teting mesos artifacts. +package testutil + +import ( + "bytes" + "fmt" + "github.com/gogo/protobuf/proto" + log "github.com/golang/glog" + "github.com/mesos/mesos-go/upid" + "github.com/stretchr/testify/assert" + "io" + "net" + "net/http" + "net/http/httptest" + "os" + "reflect" + "testing" +) + +//MockMesosHttpProcess represents a remote http process: master or slave. +type MockMesosHttpServer struct { + PID *upid.UPID + Addr string + server *httptest.Server + t *testing.T + when map[string]http.HandlerFunc +} + +type When interface { + Do(http.HandlerFunc) +} + +type WhenFunc func(http.HandlerFunc) + +func (w WhenFunc) Do(f http.HandlerFunc) { + w(f) +} + +func (m *MockMesosHttpServer) On(uri string) When { + log.V(2).Infof("when %v do something special", uri) + return WhenFunc(func(f http.HandlerFunc) { + log.V(2).Infof("registered callback for %v", uri) + m.when[uri] = f + }) +} + +func NewMockMasterHttpServer(t *testing.T, handler func(rsp http.ResponseWriter, req *http.Request)) *MockMesosHttpServer { + var server *httptest.Server + when := make(map[string]http.HandlerFunc) + stateHandler := func(rsp http.ResponseWriter, req *http.Request) { + if "/state.json" == req.RequestURI { + state := fmt.Sprintf(`{ "leader": "master@%v" }`, server.Listener.Addr()) + log.V(1).Infof("returning JSON %v", state) + io.WriteString(rsp, state) + } else if f, found := when[req.RequestURI]; found { + f(rsp, req) + } else { + handler(rsp, req) + } + } + server = httptest.NewServer(http.HandlerFunc(stateHandler)) + assert.NotNil(t, server) + addr := server.Listener.Addr().String() + pid, err := upid.Parse("master@" + addr) + assert.NoError(t, err) + assert.NotNil(t, pid) + log.Infoln("Created test Master http server with PID", pid.String()) + return &MockMesosHttpServer{PID: pid, Addr: addr, server: server, t: t, when: when} +} + +func NewMockSlaveHttpServer(t *testing.T, handler func(rsp http.ResponseWriter, req *http.Request)) *MockMesosHttpServer { + server := httptest.NewServer(http.HandlerFunc(handler)) + assert.NotNil(t, server) + addr := server.Listener.Addr().String() + pid, err := upid.Parse("slave(1)@" + addr) + assert.NoError(t, err) + assert.NotNil(t, pid) + assert.NoError(t, os.Setenv("MESOS_SLAVE_PID", pid.String())) + assert.NoError(t, os.Setenv("MESOS_SLAVE_ID", "test-slave-001")) + log.Infoln("Created test Slave http server with PID", pid.String()) + return &MockMesosHttpServer{PID: pid, Addr: addr, server: server, t: t} +} + +func (s *MockMesosHttpServer) Close() { + s.server.Close() +} + +//MockMesosClient Http client to communicate with mesos processes (master,sched,exec) +type MockMesosClient struct { + pid *upid.UPID + t *testing.T +} + +func NewMockMesosClient(t *testing.T, pid *upid.UPID) *MockMesosClient { + return &MockMesosClient{t: t, pid: pid} +} + +// sendMessage Mocks sending event messages to a processes such as master, sched or exec. +func (c *MockMesosClient) SendMessage(targetPid *upid.UPID, message proto.Message) { + if c.t == nil { + panic("MockMesosClient needs a testing context.") + } + + messageName := reflect.TypeOf(message).Elem().Name() + data, err := proto.Marshal(message) + assert.NoError(c.t, err) + hostport := net.JoinHostPort(targetPid.Host, targetPid.Port) + targetURL := fmt.Sprintf("http://%s/%s/mesos.internal.%s", hostport, targetPid.ID, messageName) + log.Infoln("MockMesosClient Sending message to", targetURL) + req, err := http.NewRequest("POST", targetURL, bytes.NewReader(data)) + assert.NoError(c.t, err) + req.Header.Add("Libprocess-From", c.pid.String()) + req.Header.Add("Content-Type", "application/x-protobuf") + resp, err := http.DefaultClient.Do(req) + assert.NoError(c.t, err) + assert.Equal(c.t, http.StatusAccepted, resp.StatusCode) +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/upid/upid_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/upid/upid_test.go index a7470b47ff8..99d446bfd34 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/upid/upid_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/upid/upid_test.go @@ -1,21 +1,12 @@ package upid import ( - "math/rand" - "strings" "testing" + "testing/quick" "github.com/stretchr/testify/assert" ) -func generateRandomString() string { - b := make([]byte, rand.Intn(1024)) - for i := range b { - b[i] = byte(rand.Int()) - } - return strings.Replace(string(b), "@", "", -1) -} - func TestUPIDParse(t *testing.T) { u, err := Parse("mesos@foo:bar") assert.Nil(t, u) @@ -29,17 +20,10 @@ func TestUPIDParse(t *testing.T) { assert.Nil(t, u) assert.Error(t, err) - // Simple fuzzy test. - for i := 0; i < 100000; i++ { - ra := generateRandomString() - u, err = Parse(ra) - if u != nil { - println(ra) - } - assert.Nil(t, u) - assert.Error(t, err) - } - + assert.Nil(t, quick.Check(func(s string) bool { + u, err := Parse(s) + return u == nil && err != nil + }, &quick.Config{MaxCount: 100000})) } func TestUPIDString(t *testing.T) {