From 5f070c11a72f46f66bfd9015358aa39b283b62b7 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 27 Oct 2015 19:50:23 -0500 Subject: [PATCH] Get rid of unecessary SchedulerLoopConfig detour to create a SchedulerLoop --- .../mesos/pkg/scheduler/integration_test.go | 11 +-- .../pkg/scheduler/operations/schedulerloop.go | 72 ++++++------------- .../operations/schedulerloop_test.go | 32 --------- .../mesos/pkg/scheduler/service/service.go | 8 ++- 4 files changed, 29 insertions(+), 94 deletions(-) delete mode 100644 contrib/mesos/pkg/scheduler/operations/schedulerloop_test.go diff --git a/contrib/mesos/pkg/scheduler/integration_test.go b/contrib/mesos/pkg/scheduler/integration_test.go index 4e20f244a1c..4045c5ebe8b 100644 --- a/contrib/mesos/pkg/scheduler/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration_test.go @@ -482,17 +482,10 @@ func newLifecycleTest(t *testing.T) lifecycleTest { // create scheduler process schedulerProc := ha.New(mesosScheduler) - // get SchedulerLoop config from it + // create scheduler loop fw := &MesosFramework{MesosScheduler: mesosScheduler} - config := operations.NewSchedulerLoopConfig(&c, fw, client, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) - assert.NotNil(config) - - // make events observable eventObs := NewEventObserver() - config.Recorder = eventObs - - // create loop - loop := operations.NewSchedulerLoop(config) + loop := operations.NewSchedulerLoop(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) assert.NotNil(loop) // create mock mesos scheduler driver diff --git a/contrib/mesos/pkg/scheduler/operations/schedulerloop.go b/contrib/mesos/pkg/scheduler/operations/schedulerloop.go index 038be19ef5a..486a6691468 100644 --- a/contrib/mesos/pkg/scheduler/operations/schedulerloop.go +++ b/contrib/mesos/pkg/scheduler/operations/schedulerloop.go @@ -17,6 +17,7 @@ limitations under the License. package operations import ( + "net/http" "time" log "github.com/golang/glog" @@ -31,8 +32,6 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/fields" - "net/http" ) const ( @@ -49,26 +48,19 @@ type SchedulerLoopInterface interface { Run(<-chan struct{}) } -type SchedulerLoopConfig struct { - Algorithm *SchedulerAlgorithm - Binder *Binder - NextPod func() *api.Pod - Error func(*api.Pod, error) - Recorder record.EventRecorder - Client *client.Client - Pr *PodReconciler - Starting chan struct{} // startup latch +type SchedulerLoop struct { + algorithm *SchedulerAlgorithm + binder *Binder + nextPod func() *api.Pod + error func(*api.Pod, error) + recorder record.EventRecorder + client *client.Client + pr *PodReconciler + starting chan struct{} // startup latch } -// NewDefaultSchedulerLoopConfig creates a SchedulerLoop -func NewDefaultSchedulerLoopConfig(c *config.Config, fw types.Framework, client *client.Client, terminate <-chan struct{}, mux *http.ServeMux) *SchedulerLoopConfig { - // use ListWatch watching pods using the client by default - lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything()) - return NewSchedulerLoopConfig(c, fw, client, terminate, mux, lw) -} - -func NewSchedulerLoopConfig(c *config.Config, fw types.Framework, client *client.Client, terminate <-chan struct{}, mux *http.ServeMux, - podsWatcher *cache.ListWatch) *SchedulerLoopConfig { +func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Client, recorder record.EventRecorder, + terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *SchedulerLoop { // Watch and queue pods that need scheduling. updates := make(chan queue.Entry, c.UpdatesBacklog) @@ -95,42 +87,18 @@ func NewSchedulerLoopConfig(c *config.Config, fw types.Framework, client *client q.InstallDebugHandlers(mux) podtask.InstallDebugHandlers(fw.Tasks(), mux) }) - return &SchedulerLoopConfig{ - Algorithm: NewSchedulerAlgorithm(fw, podUpdates), - Binder: NewBinder(fw), - NextPod: q.Yield, - Error: eh.Error, - Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), - Client: client, - Pr: podReconciler, - Starting: startLatch, - } -} - -func NewSchedulerLoop(c *SchedulerLoopConfig) SchedulerLoopInterface { return &SchedulerLoop{ - algorithm: c.Algorithm, - binder: c.Binder, - nextPod: c.NextPod, - error: c.Error, - recorder: c.Recorder, - client: c.Client, - pr: c.Pr, - starting: c.Starting, + algorithm: NewSchedulerAlgorithm(fw, podUpdates), + binder: NewBinder(fw), + nextPod: q.Yield, + error: eh.Error, + recorder: recorder, + client: client, + pr: podReconciler, + starting: startLatch, } } -type SchedulerLoop struct { - algorithm *SchedulerAlgorithm - binder *Binder - nextPod func() *api.Pod - error func(*api.Pod, error) - recorder record.EventRecorder - client *client.Client - pr *PodReconciler - starting chan struct{} -} - func (s *SchedulerLoop) Run(done <-chan struct{}) { defer close(s.starting) go runtime.Until(s.scheduleOne, recoveryDelay, done) diff --git a/contrib/mesos/pkg/scheduler/operations/schedulerloop_test.go b/contrib/mesos/pkg/scheduler/operations/schedulerloop_test.go deleted file mode 100644 index 7c49fab631f..00000000000 --- a/contrib/mesos/pkg/scheduler/operations/schedulerloop_test.go +++ /dev/null @@ -1,32 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package operations - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -// Test to create the scheduler loop with an empty lopp config -func TestPlugin_New(t *testing.T) { - assert := assert.New(t) - - c := SchedulerLoopConfig{} - p := NewSchedulerLoop(&c) - assert.NotNil(p) -} \ No newline at end of file diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 2b853d44eeb..47c4f3cde56 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -67,6 +67,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth" "k8s.io/kubernetes/pkg/fields" @@ -759,8 +760,13 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config }, } + // create scheduler loop fw := &scheduler.MesosFramework{MesosScheduler: mesosScheduler} - loop := operations.NewSchedulerLoop(operations.NewDefaultSchedulerLoopConfig(sc, fw, client, schedulerProcess.Terminal(), s.mux)) + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) + lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything()) + loop := operations.NewSchedulerLoop(sc, fw, client, recorder, schedulerProcess.Terminal(), s.mux, lw) + runtime.On(mesosScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) }) runtime.On(mesosScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal()))