diff --git a/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go b/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go index 007457b6e48..90c1d264232 100644 --- a/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go +++ b/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go @@ -22,10 +22,10 @@ import ( log "github.com/golang/glog" "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/queue" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" ) @@ -36,12 +36,12 @@ type SchedulerAlgorithm interface { // SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface type schedulerAlgorithm struct { - sched types.Scheduler + sched scheduler.Scheduler podUpdates queue.FIFO podScheduler podschedulers.PodScheduler } -func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) SchedulerAlgorithm { +func NewSchedulerAlgorithm(sched scheduler.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) SchedulerAlgorithm { return &schedulerAlgorithm{ sched: sched, podUpdates: podUpdates, diff --git a/contrib/mesos/pkg/scheduler/components/binder/binder.go b/contrib/mesos/pkg/scheduler/components/binder/binder.go index 81e999b9040..751ddce5a6a 100644 --- a/contrib/mesos/pkg/scheduler/components/binder/binder.go +++ b/contrib/mesos/pkg/scheduler/components/binder/binder.go @@ -21,10 +21,10 @@ import ( "strconv" log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/pkg/api" ) @@ -33,10 +33,10 @@ type Binder interface { } type binder struct { - sched types.Scheduler + sched scheduler.Scheduler } -func NewBinder(sched types.Scheduler) Binder { +func NewBinder(sched scheduler.Scheduler) Binder { return &binder{ sched: sched, } diff --git a/contrib/mesos/pkg/scheduler/components/deleter/deleter.go b/contrib/mesos/pkg/scheduler/components/deleter/deleter.go index 9e8df2f33ec..52ea1e9140f 100644 --- a/contrib/mesos/pkg/scheduler/components/deleter/deleter.go +++ b/contrib/mesos/pkg/scheduler/components/deleter/deleter.go @@ -25,8 +25,8 @@ import ( merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" - types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" ) type Deleter interface { @@ -35,11 +35,11 @@ type Deleter interface { } type deleter struct { - sched types.Scheduler + sched scheduler.Scheduler qr *queuer.Queuer } -func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) Deleter { +func NewDeleter(sched scheduler.Scheduler, qr *queuer.Queuer) Deleter { return &deleter{ sched: sched, qr: qr, diff --git a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go index 7d40dafa078..6e962341c0e 100644 --- a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go +++ b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go @@ -24,7 +24,7 @@ import ( merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" - types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" + types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/pkg/api" ) diff --git a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go index 403e250c628..ee0e9a9f9db 100644 --- a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go +++ b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go @@ -21,11 +21,11 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" "k8s.io/kubernetes/contrib/mesos/pkg/backoff" "k8s.io/kubernetes/contrib/mesos/pkg/queue" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" - types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/util" ) @@ -35,13 +35,13 @@ type ErrorHandler interface { } type errorHandler struct { - sched types.Scheduler + sched scheduler.Scheduler backoff *backoff.Backoff qr *queuer.Queuer podScheduler podschedulers.PodScheduler } -func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) ErrorHandler { +func NewErrorHandler(sched scheduler.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) ErrorHandler { return &errorHandler{ sched: sched, backoff: backoff, diff --git a/contrib/mesos/pkg/scheduler/components/framework/framework.go b/contrib/mesos/pkg/scheduler/components/framework/framework.go index 5a588c0c8a7..08b18031615 100644 --- a/contrib/mesos/pkg/scheduler/components/framework/framework.go +++ b/contrib/mesos/pkg/scheduler/components/framework/framework.go @@ -37,6 +37,7 @@ import ( offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/proc" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/tasksreconciler" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" @@ -44,7 +45,6 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -59,7 +59,7 @@ import ( type Framework interface { mscheduler.Scheduler - Init(sched types.Scheduler, electedMaster proc.Process, mux *http.ServeMux) error + Init(sched scheduler.Scheduler, electedMaster proc.Process, mux *http.ServeMux) error Registration() <-chan struct{} Offers() offers.Registry LaunchTask(t *podtask.T) error @@ -72,7 +72,7 @@ type framework struct { *sync.RWMutex // Config related, write-once - sched types.Scheduler + sched scheduler.Scheduler schedulerConfig *schedcfg.Config executor *mesos.ExecutorInfo executorGroup uint64 @@ -168,7 +168,7 @@ func New(config Config) Framework { return k } -func (k *framework) Init(sched types.Scheduler, electedMaster proc.Process, mux *http.ServeMux) error { +func (k *framework) Init(sched scheduler.Scheduler, electedMaster proc.Process, mux *http.ServeMux) error { log.V(1).Infoln("initializing kubernetes mesos scheduler") k.sched = sched diff --git a/contrib/mesos/pkg/scheduler/components/framework/framework_test.go b/contrib/mesos/pkg/scheduler/components/framework/framework_test.go index 419f95bad0a..e0800b68f4f 100644 --- a/contrib/mesos/pkg/scheduler/components/framework/framework_test.go +++ b/contrib/mesos/pkg/scheduler/components/framework/framework_test.go @@ -29,7 +29,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/mock" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" ) @@ -83,8 +83,8 @@ func (r *mockRegistrator) Register(hostName string, labels map[string]string) (b } } -func mockScheduler() types.Scheduler { - mockScheduler := &types.MockScheduler{} +func mockScheduler() scheduler.Scheduler { + mockScheduler := &scheduler.MockScheduler{} reg := podtask.NewInMemoryRegistry() mockScheduler.On("Tasks").Return(reg) return mockScheduler diff --git a/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go b/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go index dca374d6410..6873dd04c3d 100644 --- a/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go +++ b/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go @@ -24,7 +24,7 @@ import ( merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -36,13 +36,13 @@ type PodReconciler interface { } type podReconciler struct { - sched types.Scheduler + sched scheduler.Scheduler client *client.Client qr *queuer.Queuer deleter deleter.Deleter } -func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Queuer, deleter deleter.Deleter) PodReconciler { +func NewPodReconciler(sched scheduler.Scheduler, client *client.Client, qr *queuer.Queuer, deleter deleter.Deleter) PodReconciler { return &podReconciler{ sched: sched, client: client, diff --git a/contrib/mesos/pkg/scheduler/podstoreadapter.go b/contrib/mesos/pkg/scheduler/components/podstoreadapter.go similarity index 99% rename from contrib/mesos/pkg/scheduler/podstoreadapter.go rename to contrib/mesos/pkg/scheduler/components/podstoreadapter.go index 889faf28576..36e661028f9 100644 --- a/contrib/mesos/pkg/scheduler/podstoreadapter.go +++ b/contrib/mesos/pkg/scheduler/components/podstoreadapter.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package components import ( "k8s.io/kubernetes/contrib/mesos/pkg/queue" diff --git a/contrib/mesos/pkg/scheduler/components/scheduler.go b/contrib/mesos/pkg/scheduler/components/scheduler.go new file mode 100644 index 00000000000..b477e5afc88 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/components/scheduler.go @@ -0,0 +1,121 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package components + +import ( + "net/http" + "sync" + + "k8s.io/kubernetes/contrib/mesos/pkg/backoff" + "k8s.io/kubernetes/contrib/mesos/pkg/offers" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/binder" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/deleter" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/errorhandler" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/podreconciler" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/schedulerloop" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" +) + +// Scheduler implements types.Scheduler +type sched struct { + podReconciler podreconciler.PodReconciler + framework framework.Framework + loop schedulerloop.SchedulerLoop + + // unsafe state, needs to be guarded, especially changes to podtask.T objects + sync.RWMutex + taskRegistry podtask.Registry +} + +func NewScheduler(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler, + client *client.Client, recorder record.EventRecorder, terminate <-chan struct{}, mux *http.ServeMux, lw *cache.ListWatch) scheduler.Scheduler { + + core := &sched{ + framework: fw, + taskRegistry: podtask.NewInMemoryRegistry(), + } + + // Watch and queue pods that need scheduling. + updates := make(chan queue.Entry, c.UpdatesBacklog) + podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} + reflector := cache.NewReflector(lw, &api.Pod{}, podUpdates, 0) + + q := queuer.New(podUpdates) + + algorithm := algorithm.NewSchedulerAlgorithm(core, podUpdates, ps) + + podDeleter := deleter.NewDeleter(core, q) + + core.podReconciler = podreconciler.NewPodReconciler(core, client, q, podDeleter) + + bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration) + errorHandler := errorhandler.NewErrorHandler(core, bo, q, ps) + + binder := binder.NewBinder(core) + + startLatch := make(chan struct{}) + eventBroadcaster := record.NewBroadcaster() + + runtime.On(startLatch, func() { + eventBroadcaster.StartRecordingToSink(client.Events("")) + reflector.Run() // TODO(jdef) should listen for termination + podDeleter.Run(updates, terminate) + q.Run(terminate) + + q.InstallDebugHandlers(mux) + podtask.InstallDebugHandlers(core.Tasks(), mux) + }) + + core.loop = schedulerloop.NewSchedulerLoop(client, algorithm, recorder, q.Yield, errorHandler.Error, binder, startLatch) + return core +} + +func (c *sched) Run(done <-chan struct{}) { + c.loop.Run(done) +} + +func (c *sched) Reconcile(t *podtask.T) { + c.podReconciler.Reconcile(t) +} + +func (c *sched) Tasks() podtask.Registry { + return c.taskRegistry +} + +func (c *sched) Offers() offers.Registry { + return c.framework.Offers() +} + +func (c *sched) KillTask(id string) error { + return c.framework.KillTask(id) +} + +func (c *sched) LaunchTask(t *podtask.T) error { + return c.framework.LaunchTask(t) +} diff --git a/contrib/mesos/pkg/scheduler/types/doc.go b/contrib/mesos/pkg/scheduler/integration/doc.go similarity index 79% rename from contrib/mesos/pkg/scheduler/types/doc.go rename to contrib/mesos/pkg/scheduler/integration/doc.go index df60e4d6e5a..d124ae9f6d4 100644 --- a/contrib/mesos/pkg/scheduler/types/doc.go +++ b/contrib/mesos/pkg/scheduler/integration/doc.go @@ -14,6 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package types contains an abstract framework interface, implemented by the -// MesosScheduler and consumed by the scheduler operations. -package types +// Package integration implements integration tests. +package integration diff --git a/contrib/mesos/pkg/scheduler/integration_test.go b/contrib/mesos/pkg/scheduler/integration/integration_test.go similarity index 98% rename from contrib/mesos/pkg/scheduler/integration_test.go rename to contrib/mesos/pkg/scheduler/integration/integration_test.go index ad47896b0e1..31cbd96b6b4 100644 --- a/contrib/mesos/pkg/scheduler/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration/integration_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package integration import ( "encoding/json" @@ -33,6 +33,8 @@ import ( "github.com/stretchr/testify/mock" assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/schedulerloop" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" @@ -430,7 +432,7 @@ type lifecycleTest struct { podsListWatch *MockPodsListWatch framework framework.Framework schedulerProc *ha.SchedulerProcess - scheduler *scheduler + sched scheduler.Scheduler t *testing.T } @@ -486,7 +488,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest { // create scheduler eventObs := NewEventObserver() - scheduler := NewScheduler(&c, framework, fcfs, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) + scheduler := components.NewScheduler(&c, framework, fcfs, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) assert.NotNil(scheduler) // create mock mesos scheduler driver @@ -499,18 +501,18 @@ func newLifecycleTest(t *testing.T) lifecycleTest { podsListWatch: podsListWatch, framework: framework, schedulerProc: schedulerProc, - scheduler: scheduler, + sched: scheduler, t: t, } } func (lt lifecycleTest) Start() <-chan LaunchedTask { assert := &EventAssertions{*assert.New(lt.t)} - lt.scheduler.Run(lt.schedulerProc.Terminal()) + lt.sched.Run(lt.schedulerProc.Terminal()) // init framework err := lt.framework.Init( - lt.scheduler, + lt.sched, lt.schedulerProc.Master(), http.DefaultServeMux, ) @@ -795,7 +797,7 @@ func TestScheduler_LifeCycle(t *testing.T) { podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { - t, _ := lt.scheduler.Tasks().ForPod(podKey) + t, _ := lt.sched.Tasks().ForPod(podKey) return t == nil }) diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 0f06e7b91cf..242a829b7c6 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -17,105 +17,22 @@ limitations under the License. package scheduler import ( - "net/http" "sync" - "k8s.io/kubernetes/contrib/mesos/pkg/backoff" "k8s.io/kubernetes/contrib/mesos/pkg/offers" - "k8s.io/kubernetes/contrib/mesos/pkg/queue" - "k8s.io/kubernetes/contrib/mesos/pkg/runtime" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/binder" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/deleter" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/errorhandler" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/podreconciler" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/schedulerloop" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" ) -// Scheduler implements types.Scheduler -type scheduler struct { - podReconciler podreconciler.PodReconciler - framework framework.Framework - loop schedulerloop.SchedulerLoop +// Scheduler abstracts everything other components of the scheduler need +// to access from eachother +type Scheduler interface { + Tasks() podtask.Registry + sync.Locker // synchronize changes to tasks, i.e. lock, get task, change task, store task, unlock - // unsafe state, needs to be guarded, especially changes to podtask.T objects - sync.RWMutex - taskRegistry podtask.Registry -} - -func NewScheduler(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler, - client *client.Client, recorder record.EventRecorder, terminate <-chan struct{}, mux *http.ServeMux, lw *cache.ListWatch) types.Scheduler { - - core := &scheduler{ - framework: fw, - taskRegistry: podtask.NewInMemoryRegistry(), - } - - // Watch and queue pods that need scheduling. - updates := make(chan queue.Entry, c.UpdatesBacklog) - podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} - reflector := cache.NewReflector(lw, &api.Pod{}, podUpdates, 0) - - q := queuer.New(podUpdates) - - algorithm := algorithm.NewSchedulerAlgorithm(core, podUpdates, ps) - - podDeleter := deleter.NewDeleter(core, q) - - core.podReconciler = podreconciler.NewPodReconciler(core, client, q, podDeleter) - - bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration) - errorHandler := errorhandler.NewErrorHandler(core, bo, q, ps) - - binder := binder.NewBinder(core) - - startLatch := make(chan struct{}) - eventBroadcaster := record.NewBroadcaster() - - runtime.On(startLatch, func() { - eventBroadcaster.StartRecordingToSink(client.Events("")) - reflector.Run() // TODO(jdef) should listen for termination - podDeleter.Run(updates, terminate) - q.Run(terminate) - - q.InstallDebugHandlers(mux) - podtask.InstallDebugHandlers(core.Tasks(), mux) - }) - - core.loop = schedulerloop.NewSchedulerLoop(client, algorithm, recorder, q.Yield, errorHandler.Error, binder, startLatch) - return core -} - -func (c *scheduler) Run(done <-chan struct{}) { - c.loop.Run(done) -} - -func (c *scheduler) Reconcile(t *podtask.T) { - c.podReconciler.Reconcile(t) -} - -func (c *scheduler) Tasks() podtask.Registry { - return c.taskRegistry -} - -func (c *scheduler) Offers() offers.Registry { - return c.framework.Offers() -} - -func (c *scheduler) KillTask(id string) error { - return c.framework.KillTask(id) -} - -func (c *scheduler) LaunchTask(t *podtask.T) error { - return c.framework.LaunchTask(t) + Offers() offers.Registry + Reconcile(t *podtask.T) + KillTask(id string) error + LaunchTask(t *podtask.T) error + + Run(done <-chan struct{}) } diff --git a/contrib/mesos/pkg/scheduler/types/scheduler_mock.go b/contrib/mesos/pkg/scheduler/scheduler_mock.go similarity index 98% rename from contrib/mesos/pkg/scheduler/types/scheduler_mock.go rename to contrib/mesos/pkg/scheduler/scheduler_mock.go index 4d1f5b6171c..f4ffcf37878 100644 --- a/contrib/mesos/pkg/scheduler/types/scheduler_mock.go +++ b/contrib/mesos/pkg/scheduler/scheduler_mock.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package types +package scheduler import ( "sync" diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index eb7cf46c463..e76c718e552 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -54,7 +54,6 @@ import ( minioncfg "k8s.io/kubernetes/contrib/mesos/pkg/minion/config" "k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" @@ -75,6 +74,7 @@ import ( "k8s.io/kubernetes/pkg/master/ports" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/tools" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components" ) const ( @@ -763,14 +763,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything()) - scheduler := scheduler.NewScheduler(sc, framework, fcfs, client, recorder, schedulerProcess.Terminal(), s.mux, lw) + sched := components.NewScheduler(sc, framework, fcfs, client, recorder, schedulerProcess.Terminal(), s.mux, lw) - runtime.On(framework.Registration(), func() { scheduler.Run(schedulerProcess.Terminal()) }) + runtime.On(framework.Registration(), func() { sched.Run(schedulerProcess.Terminal()) }) runtime.On(framework.Registration(), s.newServiceWriter(schedulerProcess.Terminal())) driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) { log.V(1).Infoln("performing deferred initialization") - if err = framework.Init(scheduler, schedulerProcess.Master(), s.mux); err != nil { + if err = framework.Init(sched, schedulerProcess.Master(), s.mux); err != nil { return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err) } log.V(1).Infoln("deferred init complete") diff --git a/contrib/mesos/pkg/scheduler/types/scheduler.go b/contrib/mesos/pkg/scheduler/types/scheduler.go deleted file mode 100644 index db46aaa2309..00000000000 --- a/contrib/mesos/pkg/scheduler/types/scheduler.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package types - -import ( - "sync" - - "k8s.io/kubernetes/contrib/mesos/pkg/offers" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" -) - -// Scheduler abstracts everything other components of the scheduler need -// to access from eachother -type Scheduler interface { - Tasks() podtask.Registry - sync.Locker // synchronize changes to tasks, i.e. lock, get task, change task, store task, unlock - - Offers() offers.Registry - Reconcile(t *podtask.T) - KillTask(id string) error - LaunchTask(t *podtask.T) error - - Run(done <-chan struct{}) -}