Turn plugin into a SchedulerLoop and move to operations directory

This commit is contained in:
Dr. Stefan Schimanski
2015-10-26 23:14:18 -05:00
parent e5ce6eccf9
commit 2c4142494a
12 changed files with 282 additions and 176 deletions

View File

@@ -25,14 +25,6 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
@@ -44,10 +36,19 @@ import (
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
mmock "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/mock"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)
// A apiserver mock which partially mocks the pods API
@@ -423,9 +424,9 @@ type LaunchedTask struct {
type lifecycleTest struct {
apiServer *TestServer
driver *joinableDriver
driver *mmock.JoinableDriver
eventObs *EventObserver
plugin *schedulerPlugin
loop operations.SchedulerLoopInterface
podsListWatch *MockPodsListWatch
scheduler *MesosScheduler
schedulerProc *ha.SchedulerProcess
@@ -471,15 +472,16 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
LookupNode: apiServer.LookupNode,
})
assert.NotNil(mesosScheduler.client, "client is nil")
assert.NotNil(mesosScheduler.executor, "executor is nil")
assert.NotNil(mesosScheduler.offers, "offer registry is nil")
// TODO(sttts): re-enable the following tests
// assert.NotNil(mesosScheduler.client, "client is nil")
// assert.NotNil(mesosScheduler.executor, "executor is nil")
// assert.NotNil(mesosScheduler.offers, "offer registry is nil")
// create scheduler process
schedulerProc := ha.New(mesosScheduler)
// get plugin config from it
config := mesosScheduler.NewPluginConfig(
config := mesosScheduler.NewSchedulerLoopConfig(
schedulerProc.Terminal(),
http.DefaultServeMux,
&podsListWatch.ListWatch,
@@ -490,18 +492,18 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
eventObs := NewEventObserver()
config.Recorder = eventObs
// create plugin
plugin := NewPlugin(config).(*schedulerPlugin)
assert.NotNil(plugin)
// create loop
loop := operations.NewSchedulerLoop(config)
assert.NotNil(loop)
// create mock mesos scheduler driver
driver := &joinableDriver{}
driver := &mmock.JoinableDriver{}
return lifecycleTest{
apiServer: apiServer,
driver: driver,
eventObs: eventObs,
plugin: plugin,
loop: loop,
podsListWatch: podsListWatch,
scheduler: mesosScheduler,
schedulerProc: schedulerProc,
@@ -511,12 +513,12 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
func (lt lifecycleTest) Start() <-chan LaunchedTask {
assert := &EventAssertions{*assert.New(lt.t)}
lt.plugin.Run(lt.schedulerProc.Terminal())
lt.loop.Run(lt.schedulerProc.Terminal())
// init scheduler
err := lt.scheduler.Init(
lt.schedulerProc.Master(),
lt.plugin,
lt.loop,
http.DefaultServeMux,
)
assert.NoError(err)
@@ -588,19 +590,10 @@ func (lt lifecycleTest) End() <-chan struct{} {
return lt.schedulerProc.End()
}
// Test to create the scheduler plugin with an empty plugin config
func TestPlugin_New(t *testing.T) {
assert := assert.New(t)
c := PluginConfig{}
p := NewPlugin(&c)
assert.NotNil(p)
}
// TestPlugin_LifeCycle creates a scheduler plugin with the config returned by the scheduler,
// TestScheduler_LifeCycle creates a scheduler plugin with the config returned by the scheduler,
// and plays through the whole life cycle of the plugin while creating pods, deleting
// and failing them.
func TestPlugin_LifeCycle(t *testing.T) {
func TestScheduler_LifeCycle(t *testing.T) {
assert := &EventAssertions{*assert.New(t)}
lt := newLifecycleTest(t)
defer lt.Close()
@@ -614,7 +607,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
lt.podsListWatch.Add(pod, true) // notify watchers
// wait for failedScheduling event because there is no offer
assert.EventWithReason(lt.eventObs, FailedScheduling, "failedScheduling event not received")
assert.EventWithReason(lt.eventObs, operations.FailedScheduling, "failedScheduling event not received")
// add some matching offer
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
@@ -627,7 +620,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
lt.scheduler.ResourceOffers(nil, offers)
// and wait for scheduled pod
assert.EventWithReason(lt.eventObs, Scheduled)
assert.EventWithReason(lt.eventObs, operations.Scheduled)
select {
case launchedTask := <-launchedTasks:
// report back that the task has been staged, and then started by mesos
@@ -664,7 +657,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
// Launch a pod and wait until the scheduler driver is called
schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
// wait for failedScheduling event because there is no offer
assert.EventWithReason(lt.eventObs, FailedScheduling, "failedScheduling event not received")
assert.EventWithReason(lt.eventObs, operations.FailedScheduling, "failedScheduling event not received")
// supply a matching offer
lt.scheduler.ResourceOffers(lt.driver, offers)
@@ -679,7 +672,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
}
// and wait to get scheduled
assert.EventWithReason(lt.eventObs, Scheduled)
assert.EventWithReason(lt.eventObs, operations.Scheduled)
// wait for driver.launchTasks call
select {
@@ -809,7 +802,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
t, _ := lt.plugin.fw.Tasks().ForPod(podKey)
t, _ := lt.scheduler.taskRegistry.ForPod(podKey)
return t == nil
})

View File

@@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package mock contains a Mesos scheduler driver mock
package mock

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package mock
import (
mesos "github.com/mesos/mesos-go/mesosproto"
@@ -145,13 +145,13 @@ func (m *MockSchedulerDriver) Wait() {
m.Called()
}
type joinableDriver struct {
type JoinableDriver struct {
MockSchedulerDriver
joinFunc func() (mesos.Status, error)
}
// Join invokes joinFunc if it has been set, otherwise blocks forever
func (m *joinableDriver) Join() (mesos.Status, error) {
func (m *JoinableDriver) Join() (mesos.Status, error) {
if m.joinFunc != nil {
return m.joinFunc()
}

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package operations
import (
"fmt"
@@ -27,18 +27,24 @@ import (
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
)
// mesosSchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface
type mesosSchedulerAlgorithm struct {
// SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface
type SchedulerAlgorithm struct {
fw types.Framework
podUpdates queue.FIFO
}
func NewSchedulerAlgorithm(fw types.Framework, podUpdates queue.FIFO) *SchedulerAlgorithm {
return &SchedulerAlgorithm{
fw: fw,
podUpdates: podUpdates,
}
}
// Schedule implements the Scheduler interface of Kubernetes.
// It returns the selectedMachine's name and error (if there's any).
func (k *mesosSchedulerAlgorithm) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) {
func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
log.Infof("Try to schedule pod %v\n", pod.Name)
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
@@ -91,7 +97,7 @@ func (k *mesosSchedulerAlgorithm) Schedule(pod *api.Pod, unused algorithm.NodeLi
}
// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
func (k *mesosSchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) {
func (k *SchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) {
var offer offers.Perishable
if task.HasAcceptedOffer() {
// verify that the offer is still on the table

View File

@@ -14,5 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package operations implements independent aspects of the scheduler
// Package operations implements independent aspects of the scheduler which
// do not use MesosScheduler internals, but rely solely on the Framework
// interface.
package operations

View File

@@ -14,112 +14,38 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package operations
import (
"time"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
plugin "k8s.io/kubernetes/plugin/pkg/scheduler"
)
const (
pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling
FailedScheduling = "FailedScheduling"
Scheduled = "Scheduled"
)
type PluginInterface interface {
// the apiserver may have a different state for the pod than we do
// so reconcile our records, but only for this one pod
reconcileTask(*podtask.T)
// execute the Scheduling plugin, should start a go routine and return immediately
Run(<-chan struct{})
// PodReconciler reconciles a pod with the apiserver
type PodReconciler struct {
fw types.Framework
client *client.Client
qr *queuer.Queuer
deleter *Deleter
}
type PluginConfig struct {
*plugin.Config
fw types.Framework
client *client.Client
qr *queuer.Queuer
deleter *operations.Deleter
starting chan struct{} // startup latch
}
func NewPlugin(c *PluginConfig) PluginInterface {
return &schedulerPlugin{
config: c.Config,
fw: c.fw,
client: c.client,
qr: c.qr,
deleter: c.deleter,
starting: c.starting,
func NewPodReconciler(fw types.Framework, client *client.Client, qr *queuer.Queuer, deleter *Deleter) *PodReconciler {
return &PodReconciler{
fw: fw,
client: client,
qr: qr,
deleter: deleter,
}
}
type schedulerPlugin struct {
config *plugin.Config
fw types.Framework
client *client.Client
qr *queuer.Queuer
deleter *operations.Deleter
starting chan struct{}
}
func (s *schedulerPlugin) Run(done <-chan struct{}) {
defer close(s.starting)
go runtime.Until(s.scheduleOne, pluginRecoveryDelay, done)
}
// hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go,
// with the Modeler stuff removed since we don't use it because we have mesos.
func (s *schedulerPlugin) scheduleOne() {
pod := s.config.NextPod()
// pods which are pre-scheduled (i.e. NodeName is set) are deleted by the kubelet
// in upstream. Not so in Mesos because the kubelet hasn't see that pod yet. Hence,
// the scheduler has to take care of this:
if pod.Spec.NodeName != "" && pod.DeletionTimestamp != nil {
log.V(3).Infof("deleting pre-scheduled, not yet running pod: %s/%s", pod.Namespace, pod.Name)
s.client.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0))
return
}
log.V(3).Infof("Attempting to schedule: %+v", pod)
dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister) // call kubeScheduler.Schedule
if err != nil {
log.V(1).Infof("Failed to schedule: %+v", pod)
s.config.Recorder.Eventf(pod, FailedScheduling, "Error scheduling: %v", err)
s.config.Error(pod, err)
return
}
b := &api.Binding{
ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},
Target: api.ObjectReference{
Kind: "Node",
Name: dest,
},
}
if err := s.config.Binder.Bind(b); err != nil {
log.V(1).Infof("Failed to bind pod: %+v", err)
s.config.Recorder.Eventf(pod, FailedScheduling, "Binding rejected: %v", err)
s.config.Error(pod, err)
return
}
s.config.Recorder.Eventf(pod, Scheduled, "Successfully assigned %v to %v", pod.Name, dest)
}
// this pod may be out of sync with respect to the API server registry:
// this pod | apiserver registry
// -------------|----------------------
@@ -131,7 +57,7 @@ func (s *schedulerPlugin) scheduleOne() {
// host="..." | host="..." ; perhaps no updates to process?
//
// TODO(jdef) this needs an integration test
func (s *schedulerPlugin) reconcileTask(t *podtask.T) {
func (s *PodReconciler) Reconcile(t *podtask.T) {
log.V(1).Infof("reconcile pod %v, assigned to slave %q", t.Pod.Name, t.Spec.AssignedSlave)
ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace)
pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name)

View File

@@ -0,0 +1,133 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operations
import (
"time"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
)
const (
recoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling
FailedScheduling = "FailedScheduling"
Scheduled = "Scheduled"
)
type SchedulerLoopInterface interface {
ReconcilePodTask(t *podtask.T)
// execute the Scheduling plugin, should start a go routine and return immediately
Run(<-chan struct{})
}
type SchedulerLoopConfig struct {
Algorithm *SchedulerAlgorithm
Binder *Binder
NextPod func() *api.Pod
Error func(*api.Pod, error)
Recorder record.EventRecorder
Fw types.Framework
Client *client.Client
Qr *queuer.Queuer
Pr *PodReconciler
Starting chan struct{} // startup latch
}
func NewSchedulerLoop(c *SchedulerLoopConfig) SchedulerLoopInterface {
return &SchedulerLoop{
algorithm: c.Algorithm,
binder: c.Binder,
nextPod: c.NextPod,
error: c.Error,
recorder: c.Recorder,
fw: c.Fw,
client: c.Client,
qr: c.Qr,
pr: c.Pr,
starting: c.Starting,
}
}
type SchedulerLoop struct {
algorithm *SchedulerAlgorithm
binder *Binder
nextPod func() *api.Pod
error func(*api.Pod, error)
recorder record.EventRecorder
fw types.Framework
client *client.Client
qr *queuer.Queuer
pr *PodReconciler
starting chan struct{}
}
func (s *SchedulerLoop) Run(done <-chan struct{}) {
defer close(s.starting)
go runtime.Until(s.scheduleOne, recoveryDelay, done)
}
// hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go,
// with the Modeler stuff removed since we don't use it because we have mesos.
func (s *SchedulerLoop) scheduleOne() {
pod := s.nextPod()
// pods which are pre-scheduled (i.e. NodeName is set) are deleted by the kubelet
// in upstream. Not so in Mesos because the kubelet hasn't see that pod yet. Hence,
// the scheduler has to take care of this:
if pod.Spec.NodeName != "" && pod.DeletionTimestamp != nil {
log.V(3).Infof("deleting pre-scheduled, not yet running pod: %s/%s", pod.Namespace, pod.Name)
s.client.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0))
return
}
log.V(3).Infof("Attempting to schedule: %+v", pod)
dest, err := s.algorithm.Schedule(pod)
if err != nil {
log.V(1).Infof("Failed to schedule: %+v", pod)
s.recorder.Eventf(pod, FailedScheduling, "Error scheduling: %v", err)
s.error(pod, err)
return
}
b := &api.Binding{
ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},
Target: api.ObjectReference{
Kind: "Node",
Name: dest,
},
}
if err := s.binder.Bind(b); err != nil {
log.V(1).Infof("Failed to bind pod: %+v", err)
s.recorder.Eventf(pod, FailedScheduling, "Binding rejected: %v", err)
s.error(pod, err)
return
}
s.recorder.Eventf(pod, Scheduled, "Successfully assigned %v to %v", pod.Name, dest)
}
func (s *SchedulerLoop) ReconcilePodTask(t *podtask.T) {
s.pr.Reconcile(t)
}

View File

@@ -0,0 +1,32 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operations
import (
"testing"
"github.com/stretchr/testify/assert"
)
// Test to create the scheduler loop with an empty lopp config
func TestPlugin_New(t *testing.T) {
assert := assert.New(t)
c := SchedulerLoopConfig{}
p := NewSchedulerLoop(&c)
assert.NotNil(p)
}

View File

@@ -29,7 +29,7 @@ import (
type ReconcilerAction func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error
type Reconciler struct {
type TasksReconciler struct {
proc.Doer
Action ReconcilerAction
explicit chan struct{} // send an empty struct to trigger explicit reconciliation
@@ -39,9 +39,9 @@ type Reconciler struct {
explicitReconciliationAbortTimeout time.Duration
}
func NewReconciler(doer proc.Doer, action ReconcilerAction,
cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) *Reconciler {
return &Reconciler{
func NewTasksReconciler(doer proc.Doer, action ReconcilerAction,
cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) *TasksReconciler {
return &TasksReconciler{
Doer: doer,
explicit: make(chan struct{}, 1),
implicit: make(chan struct{}, 1),
@@ -67,14 +67,14 @@ func NewReconciler(doer proc.Doer, action ReconcilerAction,
}
}
func (r *Reconciler) RequestExplicit() {
func (r *TasksReconciler) RequestExplicit() {
select {
case r.explicit <- struct{}{}: // noop
default: // request queue full; noop
}
}
func (r *Reconciler) RequestImplicit() {
func (r *TasksReconciler) RequestImplicit() {
select {
case r.implicit <- struct{}{}: // noop
default: // request queue full; noop
@@ -84,7 +84,7 @@ func (r *Reconciler) RequestImplicit() {
// execute task reconciliation, returns when r.done is closed. intended to run as a goroutine.
// if reconciliation is requested while another is in progress, the in-progress operation will be
// cancelled before the new reconciliation operation begins.
func (r *Reconciler) Run(driver bindings.SchedulerDriver) {
func (r *TasksReconciler) Run(driver bindings.SchedulerDriver) {
var cancel, finished chan struct{}
requestLoop:
for {

View File

@@ -57,7 +57,6 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
plugin "k8s.io/kubernetes/plugin/pkg/scheduler"
)
// KubernetesScheduler implements:
@@ -100,8 +99,8 @@ type MesosScheduler struct {
// via deferred init
plugin PluginInterface
reconciler *operations.Reconciler
loop operations.SchedulerLoopInterface
reconciler *operations.TasksReconciler
reconcileCooldown time.Duration
asRegisteredMaster proc.Doer
terminate <-chan struct{} // signal chan, closes when we should kill background tasks
@@ -119,7 +118,7 @@ type Config struct {
LookupNode node.LookupFunc
}
// New creates a new KubernetesScheduler
// New creates a new MesosScheduler
func New(config Config) *MesosScheduler {
var k *MesosScheduler
k = &MesosScheduler{
@@ -178,7 +177,7 @@ func New(config Config) *MesosScheduler {
return k
}
func (k *MesosScheduler) Init(electedMaster proc.Process, pl PluginInterface, mux *http.ServeMux) error {
func (k *MesosScheduler) Init(electedMaster proc.Process, sl operations.SchedulerLoopInterface, mux *http.ServeMux) error {
log.V(1).Infoln("initializing kubernetes mesos scheduler")
k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error {
@@ -188,7 +187,7 @@ func (k *MesosScheduler) Init(electedMaster proc.Process, pl PluginInterface, mu
return electedMaster.Do(a)
})
k.terminate = electedMaster.Done()
k.plugin = pl
k.loop = sl
k.offers.Init(k.terminate)
k.InstallDebugHandlers(mux)
k.nodeRegistrator.Run(k.terminate)
@@ -296,7 +295,7 @@ func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver)
r1 := k.makeTaskRegistryReconciler()
r2 := k.makePodRegistryReconciler()
k.reconciler = operations.NewReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
k.reconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate)
go k.reconciler.Run(driver)
@@ -398,7 +397,7 @@ func (k *MesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatu
case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR:
if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil {
if task.Has(podtask.Launched) && !task.Has(podtask.Bound) {
go k.plugin.reconcileTask(task)
go k.loop.ReconcilePodTask(task)
return
}
} else {
@@ -760,14 +759,14 @@ func (ks *MesosScheduler) recoverTasks() error {
}
// Create creates a scheduler plugin and all supporting background functions.
func (k *MesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig {
func (k *MesosScheduler) NewDefaultSchedulerLoopConfig(terminate <-chan struct{}, mux *http.ServeMux) *operations.SchedulerLoopConfig {
// use ListWatch watching pods using the client by default
lw := cache.NewListWatchFromClient(k.client, "pods", api.NamespaceAll, fields.Everything())
return k.NewPluginConfig(terminate, mux, lw)
return k.NewSchedulerLoopConfig(terminate, mux, lw)
}
func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux,
podsWatcher *cache.ListWatch) *PluginConfig {
func (k *MesosScheduler) NewSchedulerLoopConfig(terminate <-chan struct{}, mux *http.ServeMux,
podsWatcher *cache.ListWatch) *operations.SchedulerLoopConfig {
// Watch and queue pods that need scheduling.
updates := make(chan queue.Entry, k.schedulerConfig.UpdatesBacklog)
@@ -780,6 +779,7 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se
scheduler := &mesosFramework{mesosScheduler: k}
q := queuer.New(podUpdates)
podDeleter := operations.NewDeleter(scheduler, q)
podReconciler := operations.NewPodReconciler(scheduler, k.client, q, podDeleter)
bo := backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration)
eh := operations.NewErrorHandler(scheduler, bo, q)
startLatch := make(chan struct{})
@@ -793,22 +793,16 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se
q.InstallDebugHandlers(mux)
podtask.InstallDebugHandlers(k.taskRegistry, mux)
})
return &PluginConfig{
Config: &plugin.Config{
NodeLister: nil,
Algorithm: &mesosSchedulerAlgorithm{
fw: scheduler,
podUpdates: podUpdates,
},
Binder: operations.NewBinder(scheduler),
NextPod: q.Yield,
Error: eh.Error,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
},
fw: scheduler,
client: k.client,
qr: q,
deleter: podDeleter,
starting: startLatch,
return &operations.SchedulerLoopConfig{
Algorithm: operations.NewSchedulerAlgorithm(scheduler, podUpdates),
Binder: operations.NewBinder(scheduler),
NextPod: q.Yield,
Error: eh.Error,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
Fw: scheduler,
Client: k.client,
Qr: q,
Pr: podReconciler,
Starting: startLatch,
}
}

View File

@@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/mock"
)
//get number of non-expired offers from offer registry
@@ -283,7 +284,7 @@ func TestDisconnect(t *testing.T) {
//test we can handle different status updates, TODO check state transitions
func TestStatus_Update(t *testing.T) {
mockdriver := MockSchedulerDriver{}
mockdriver := mock.MockSchedulerDriver{}
// setup expectations
mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil)

View File

@@ -73,6 +73,7 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
)
const (
@@ -758,13 +759,13 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
},
}
kpl := scheduler.NewPlugin(mesosPodScheduler.NewDefaultPluginConfig(schedulerProcess.Terminal(), s.mux))
runtime.On(mesosPodScheduler.Registration(), func() { kpl.Run(schedulerProcess.Terminal()) })
loop := operations.NewSchedulerLoop(mesosPodScheduler.NewDefaultSchedulerLoopConfig(schedulerProcess.Terminal(), s.mux))
runtime.On(mesosPodScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) })
runtime.On(mesosPodScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal()))
driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) {
log.V(1).Infoln("performing deferred initialization")
if err = mesosPodScheduler.Init(schedulerProcess.Master(), kpl, s.mux); err != nil {
if err = mesosPodScheduler.Init(schedulerProcess.Master(), loop, s.mux); err != nil {
return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err)
}
log.V(1).Infoln("deferred init complete")