diff --git a/contrib/mesos/pkg/scheduler/framework.go b/contrib/mesos/pkg/scheduler/framework.go index 4a1d3bbc435..951523f842c 100644 --- a/contrib/mesos/pkg/scheduler/framework.go +++ b/contrib/mesos/pkg/scheduler/framework.go @@ -27,42 +27,42 @@ import ( "k8s.io/kubernetes/pkg/api" ) -type mesosFramework struct { +type MesosFramework struct { sync.Mutex - mesosScheduler *MesosScheduler + MesosScheduler *MesosScheduler } -func (fw *mesosFramework) PodScheduler() podschedulers.PodScheduler { - return fw.mesosScheduler.podScheduler +func (fw *MesosFramework) PodScheduler() podschedulers.PodScheduler { + return fw.MesosScheduler.podScheduler } -func (fw *mesosFramework) Offers() offers.Registry { - return fw.mesosScheduler.offers +func (fw *MesosFramework) Offers() offers.Registry { + return fw.MesosScheduler.offers } -func (fw *mesosFramework) Tasks() podtask.Registry { - return fw.mesosScheduler.taskRegistry +func (fw *MesosFramework) Tasks() podtask.Registry { + return fw.MesosScheduler.taskRegistry } -func (fw *mesosFramework) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) { - return podtask.New(ctx, "", *pod, fw.mesosScheduler.executor) +func (fw *MesosFramework) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) { + return podtask.New(ctx, "", *pod, fw.MesosScheduler.executor) } -func (fw *mesosFramework) SlaveHostNameFor(id string) string { - return fw.mesosScheduler.slaveHostNames.HostName(id) +func (fw *MesosFramework) SlaveHostNameFor(id string) string { + return fw.MesosScheduler.slaveHostNames.HostName(id) } -func (fw *mesosFramework) KillTask(taskId string) error { +func (fw *MesosFramework) KillTask(taskId string) error { killTaskId := mutil.NewTaskID(taskId) - _, err := fw.mesosScheduler.driver.KillTask(killTaskId) + _, err := fw.MesosScheduler.driver.KillTask(killTaskId) return err } -func (fw *mesosFramework) LaunchTask(task *podtask.T) error { +func (fw *MesosFramework) LaunchTask(task *podtask.T) error { // assume caller is holding scheduler lock taskList := []*mesos.TaskInfo{task.BuildTaskInfo()} offerIds := []*mesos.OfferID{task.Offer.Details().Id} filters := &mesos.Filters{} - _, err := fw.mesosScheduler.driver.LaunchTasks(offerIds, taskList, filters) + _, err := fw.MesosScheduler.driver.LaunchTasks(offerIds, taskList, filters) return err -} \ No newline at end of file +} diff --git a/contrib/mesos/pkg/scheduler/integration_test.go b/contrib/mesos/pkg/scheduler/integration_test.go index 36e4d8e432f..4e20f244a1c 100644 --- a/contrib/mesos/pkg/scheduler/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration_test.go @@ -461,14 +461,16 @@ func newLifecycleTest(t *testing.T) lifecycleTest { ), ) + client := client.NewOrDie(&client.Config{ + Host: apiServer.server.URL, + Version: testapi.Default.Version(), + }) + c := *schedcfg.CreateDefaultConfig() mesosScheduler := New(Config{ - Executor: ei, - Client: client.NewOrDie(&client.Config{ - Host: apiServer.server.URL, - Version: testapi.Default.Version(), - }), + Executor: ei, + Client: client, PodScheduler: podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode), - SchedulerConfig: *schedcfg.CreateDefaultConfig(), + SchedulerConfig: c, LookupNode: apiServer.LookupNode, }) @@ -480,12 +482,9 @@ func newLifecycleTest(t *testing.T) lifecycleTest { // create scheduler process schedulerProc := ha.New(mesosScheduler) - // get plugin config from it - config := mesosScheduler.NewSchedulerLoopConfig( - schedulerProc.Terminal(), - http.DefaultServeMux, - &podsListWatch.ListWatch, - ) + // get SchedulerLoop config from it + fw := &MesosFramework{MesosScheduler: mesosScheduler} + config := operations.NewSchedulerLoopConfig(&c, fw, client, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) assert.NotNil(config) // make events observable diff --git a/contrib/mesos/pkg/scheduler/podstoreadapter.go b/contrib/mesos/pkg/scheduler/operations/podstoreadapter.go similarity index 98% rename from contrib/mesos/pkg/scheduler/podstoreadapter.go rename to contrib/mesos/pkg/scheduler/operations/podstoreadapter.go index 4553bb6a773..3b7c698787c 100644 --- a/contrib/mesos/pkg/scheduler/podstoreadapter.go +++ b/contrib/mesos/pkg/scheduler/operations/podstoreadapter.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package operations import ( "k8s.io/kubernetes/contrib/mesos/pkg/queue" @@ -60,4 +60,4 @@ func (psa *podStoreAdapter) Replace(objs []interface{}, resourceVersion string) newobjs[i] = &queuer.Pod{Pod: pod} } return psa.FIFO.Replace(newobjs, resourceVersion) -} \ No newline at end of file +} diff --git a/contrib/mesos/pkg/scheduler/operations/schedulerloop.go b/contrib/mesos/pkg/scheduler/operations/schedulerloop.go index 2e28e3c4400..3b2fab17147 100644 --- a/contrib/mesos/pkg/scheduler/operations/schedulerloop.go +++ b/contrib/mesos/pkg/scheduler/operations/schedulerloop.go @@ -20,13 +20,19 @@ import ( "time" log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/backoff" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" + "net/http" ) const ( @@ -56,6 +62,55 @@ type SchedulerLoopConfig struct { Starting chan struct{} // startup latch } +// NewDefaultSchedulerLoopConfig creates a SchedulerLoop +func NewDefaultSchedulerLoopConfig(c *config.Config, fw types.Framework, client *client.Client, terminate <-chan struct{}, mux *http.ServeMux) *SchedulerLoopConfig { + // use ListWatch watching pods using the client by default + lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything()) + return NewSchedulerLoopConfig(c, fw, client, terminate, mux, lw) +} + +func NewSchedulerLoopConfig(c *config.Config, fw types.Framework, client *client.Client, terminate <-chan struct{}, mux *http.ServeMux, + podsWatcher *cache.ListWatch) *SchedulerLoopConfig { + + // Watch and queue pods that need scheduling. + updates := make(chan queue.Entry, c.UpdatesBacklog) + podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} + reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0) + + // lock that guards critial sections that involve transferring pods from + // the store (cache) to the scheduling queue; its purpose is to maintain + // an ordering (vs interleaving) of operations that's easier to reason about. + + q := queuer.New(podUpdates) + podDeleter := NewDeleter(fw, q) + podReconciler := NewPodReconciler(fw, client, q, podDeleter) + bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration) + eh := NewErrorHandler(fw, bo, q) + startLatch := make(chan struct{}) + eventBroadcaster := record.NewBroadcaster() + runtime.On(startLatch, func() { + eventBroadcaster.StartRecordingToSink(client.Events("")) + reflector.Run() // TODO(jdef) should listen for termination + podDeleter.Run(updates, terminate) + q.Run(terminate) + + q.InstallDebugHandlers(mux) + podtask.InstallDebugHandlers(fw.Tasks(), mux) + }) + return &SchedulerLoopConfig{ + Algorithm: NewSchedulerAlgorithm(fw, podUpdates), + Binder: NewBinder(fw), + NextPod: q.Yield, + Error: eh.Error, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), + Fw: fw, + Client: client, + Qr: q, + Pr: podReconciler, + Starting: startLatch, + } +} + func NewSchedulerLoop(c *SchedulerLoopConfig) SchedulerLoopInterface { return &SchedulerLoop{ algorithm: c.Algorithm, diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index df48e1476b1..4359cb05a54 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -28,14 +28,12 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" bindings "github.com/mesos/mesos-go/scheduler" - "k8s.io/kubernetes/contrib/mesos/pkg/backoff" execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/offers" offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/proc" - "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" @@ -44,13 +42,10 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubelet/container" @@ -757,52 +752,3 @@ func (ks *MesosScheduler) recoverTasks() error { } return nil } - -// Create creates a scheduler plugin and all supporting background functions. -func (k *MesosScheduler) NewDefaultSchedulerLoopConfig(terminate <-chan struct{}, mux *http.ServeMux) *operations.SchedulerLoopConfig { - // use ListWatch watching pods using the client by default - lw := cache.NewListWatchFromClient(k.client, "pods", api.NamespaceAll, fields.Everything()) - return k.NewSchedulerLoopConfig(terminate, mux, lw) -} - -func (k *MesosScheduler) NewSchedulerLoopConfig(terminate <-chan struct{}, mux *http.ServeMux, - podsWatcher *cache.ListWatch) *operations.SchedulerLoopConfig { - - // Watch and queue pods that need scheduling. - updates := make(chan queue.Entry, k.schedulerConfig.UpdatesBacklog) - podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} - reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0) - - // lock that guards critial sections that involve transferring pods from - // the store (cache) to the scheduling queue; its purpose is to maintain - // an ordering (vs interleaving) of operations that's easier to reason about. - scheduler := &mesosFramework{mesosScheduler: k} - q := queuer.New(podUpdates) - podDeleter := operations.NewDeleter(scheduler, q) - podReconciler := operations.NewPodReconciler(scheduler, k.client, q, podDeleter) - bo := backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration) - eh := operations.NewErrorHandler(scheduler, bo, q) - startLatch := make(chan struct{}) - eventBroadcaster := record.NewBroadcaster() - runtime.On(startLatch, func() { - eventBroadcaster.StartRecordingToSink(k.client.Events("")) - reflector.Run() // TODO(jdef) should listen for termination - podDeleter.Run(updates, terminate) - q.Run(terminate) - - q.InstallDebugHandlers(mux) - podtask.InstallDebugHandlers(k.taskRegistry, mux) - }) - return &operations.SchedulerLoopConfig{ - Algorithm: operations.NewSchedulerAlgorithm(scheduler, podUpdates), - Binder: operations.NewBinder(scheduler), - NextPod: q.Yield, - Error: eh.Error, - Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), - Fw: scheduler, - Client: k.client, - Qr: q, - Pr: podReconciler, - Starting: startLatch, - } -} diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index eaceac8b3b3..2b853d44eeb 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -59,6 +59,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" @@ -73,7 +74,6 @@ import ( "k8s.io/kubernetes/pkg/master/ports" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/tools" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" ) const ( @@ -719,7 +719,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config } fcfs := podschedulers.NewFCFSPodScheduler(as, lookupNode) - mesosPodScheduler := scheduler.New(scheduler.Config{ + mesosScheduler := scheduler.New(scheduler.Config{ SchedulerConfig: *sc, Executor: executor, PodScheduler: fcfs, @@ -743,7 +743,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config log.Fatalf("Misconfigured mesos framework: %v", err) } - schedulerProcess := ha.New(mesosPodScheduler) + schedulerProcess := ha.New(mesosScheduler) dconfig := &bindings.DriverConfig{ Scheduler: schedulerProcess, Framework: info, @@ -759,13 +759,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config }, } - loop := operations.NewSchedulerLoop(mesosPodScheduler.NewDefaultSchedulerLoopConfig(schedulerProcess.Terminal(), s.mux)) - runtime.On(mesosPodScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) }) - runtime.On(mesosPodScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal())) + fw := &scheduler.MesosFramework{MesosScheduler: mesosScheduler} + loop := operations.NewSchedulerLoop(operations.NewDefaultSchedulerLoopConfig(sc, fw, client, schedulerProcess.Terminal(), s.mux)) + runtime.On(mesosScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) }) + runtime.On(mesosScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal())) driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) { log.V(1).Infoln("performing deferred initialization") - if err = mesosPodScheduler.Init(schedulerProcess.Master(), loop, s.mux); err != nil { + if err = mesosScheduler.Init(schedulerProcess.Master(), loop, s.mux); err != nil { return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err) } log.V(1).Infoln("deferred init complete")