Separate SchedulerLoop instantiation from MesosScheduler

This commit is contained in:
Dr. Stefan Schimanski 2015-10-27 19:20:57 -05:00
parent 2c4142494a
commit 0ebfc02d16
6 changed files with 93 additions and 92 deletions

View File

@ -27,42 +27,42 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
type mesosFramework struct { type MesosFramework struct {
sync.Mutex sync.Mutex
mesosScheduler *MesosScheduler MesosScheduler *MesosScheduler
} }
func (fw *mesosFramework) PodScheduler() podschedulers.PodScheduler { func (fw *MesosFramework) PodScheduler() podschedulers.PodScheduler {
return fw.mesosScheduler.podScheduler return fw.MesosScheduler.podScheduler
} }
func (fw *mesosFramework) Offers() offers.Registry { func (fw *MesosFramework) Offers() offers.Registry {
return fw.mesosScheduler.offers return fw.MesosScheduler.offers
} }
func (fw *mesosFramework) Tasks() podtask.Registry { func (fw *MesosFramework) Tasks() podtask.Registry {
return fw.mesosScheduler.taskRegistry return fw.MesosScheduler.taskRegistry
} }
func (fw *mesosFramework) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) { func (fw *MesosFramework) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) {
return podtask.New(ctx, "", *pod, fw.mesosScheduler.executor) return podtask.New(ctx, "", *pod, fw.MesosScheduler.executor)
} }
func (fw *mesosFramework) SlaveHostNameFor(id string) string { func (fw *MesosFramework) SlaveHostNameFor(id string) string {
return fw.mesosScheduler.slaveHostNames.HostName(id) return fw.MesosScheduler.slaveHostNames.HostName(id)
} }
func (fw *mesosFramework) KillTask(taskId string) error { func (fw *MesosFramework) KillTask(taskId string) error {
killTaskId := mutil.NewTaskID(taskId) killTaskId := mutil.NewTaskID(taskId)
_, err := fw.mesosScheduler.driver.KillTask(killTaskId) _, err := fw.MesosScheduler.driver.KillTask(killTaskId)
return err return err
} }
func (fw *mesosFramework) LaunchTask(task *podtask.T) error { func (fw *MesosFramework) LaunchTask(task *podtask.T) error {
// assume caller is holding scheduler lock // assume caller is holding scheduler lock
taskList := []*mesos.TaskInfo{task.BuildTaskInfo()} taskList := []*mesos.TaskInfo{task.BuildTaskInfo()}
offerIds := []*mesos.OfferID{task.Offer.Details().Id} offerIds := []*mesos.OfferID{task.Offer.Details().Id}
filters := &mesos.Filters{} filters := &mesos.Filters{}
_, err := fw.mesosScheduler.driver.LaunchTasks(offerIds, taskList, filters) _, err := fw.MesosScheduler.driver.LaunchTasks(offerIds, taskList, filters)
return err return err
} }

View File

@ -461,14 +461,16 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
), ),
) )
client := client.NewOrDie(&client.Config{
Host: apiServer.server.URL,
Version: testapi.Default.Version(),
})
c := *schedcfg.CreateDefaultConfig()
mesosScheduler := New(Config{ mesosScheduler := New(Config{
Executor: ei, Executor: ei,
Client: client.NewOrDie(&client.Config{ Client: client,
Host: apiServer.server.URL,
Version: testapi.Default.Version(),
}),
PodScheduler: podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode), PodScheduler: podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode),
SchedulerConfig: *schedcfg.CreateDefaultConfig(), SchedulerConfig: c,
LookupNode: apiServer.LookupNode, LookupNode: apiServer.LookupNode,
}) })
@ -480,12 +482,9 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
// create scheduler process // create scheduler process
schedulerProc := ha.New(mesosScheduler) schedulerProc := ha.New(mesosScheduler)
// get plugin config from it // get SchedulerLoop config from it
config := mesosScheduler.NewSchedulerLoopConfig( fw := &MesosFramework{MesosScheduler: mesosScheduler}
schedulerProc.Terminal(), config := operations.NewSchedulerLoopConfig(&c, fw, client, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch)
http.DefaultServeMux,
&podsListWatch.ListWatch,
)
assert.NotNil(config) assert.NotNil(config)
// make events observable // make events observable

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package scheduler package operations
import ( import (
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
@ -60,4 +60,4 @@ func (psa *podStoreAdapter) Replace(objs []interface{}, resourceVersion string)
newobjs[i] = &queuer.Pod{Pod: pod} newobjs[i] = &queuer.Pod{Pod: pod}
} }
return psa.FIFO.Replace(newobjs, resourceVersion) return psa.FIFO.Replace(newobjs, resourceVersion)
} }

View File

@ -20,13 +20,19 @@ import (
"time" "time"
log "github.com/golang/glog" log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/backoff"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"net/http"
) )
const ( const (
@ -56,6 +62,55 @@ type SchedulerLoopConfig struct {
Starting chan struct{} // startup latch Starting chan struct{} // startup latch
} }
// NewDefaultSchedulerLoopConfig creates a SchedulerLoop
func NewDefaultSchedulerLoopConfig(c *config.Config, fw types.Framework, client *client.Client, terminate <-chan struct{}, mux *http.ServeMux) *SchedulerLoopConfig {
// use ListWatch watching pods using the client by default
lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything())
return NewSchedulerLoopConfig(c, fw, client, terminate, mux, lw)
}
func NewSchedulerLoopConfig(c *config.Config, fw types.Framework, client *client.Client, terminate <-chan struct{}, mux *http.ServeMux,
podsWatcher *cache.ListWatch) *SchedulerLoopConfig {
// Watch and queue pods that need scheduling.
updates := make(chan queue.Entry, c.UpdatesBacklog)
podUpdates := &podStoreAdapter{queue.NewHistorical(updates)}
reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0)
// lock that guards critial sections that involve transferring pods from
// the store (cache) to the scheduling queue; its purpose is to maintain
// an ordering (vs interleaving) of operations that's easier to reason about.
q := queuer.New(podUpdates)
podDeleter := NewDeleter(fw, q)
podReconciler := NewPodReconciler(fw, client, q, podDeleter)
bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration)
eh := NewErrorHandler(fw, bo, q)
startLatch := make(chan struct{})
eventBroadcaster := record.NewBroadcaster()
runtime.On(startLatch, func() {
eventBroadcaster.StartRecordingToSink(client.Events(""))
reflector.Run() // TODO(jdef) should listen for termination
podDeleter.Run(updates, terminate)
q.Run(terminate)
q.InstallDebugHandlers(mux)
podtask.InstallDebugHandlers(fw.Tasks(), mux)
})
return &SchedulerLoopConfig{
Algorithm: NewSchedulerAlgorithm(fw, podUpdates),
Binder: NewBinder(fw),
NextPod: q.Yield,
Error: eh.Error,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
Fw: fw,
Client: client,
Qr: q,
Pr: podReconciler,
Starting: startLatch,
}
}
func NewSchedulerLoop(c *SchedulerLoopConfig) SchedulerLoopInterface { func NewSchedulerLoop(c *SchedulerLoopConfig) SchedulerLoopInterface {
return &SchedulerLoop{ return &SchedulerLoop{
algorithm: c.Algorithm, algorithm: c.Algorithm,

View File

@ -28,14 +28,12 @@ import (
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil" mutil "github.com/mesos/mesos-go/mesosutil"
bindings "github.com/mesos/mesos-go/scheduler" bindings "github.com/mesos/mesos-go/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/backoff"
execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/offers"
offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/proc" "k8s.io/kubernetes/contrib/mesos/pkg/proc"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
@ -44,13 +42,10 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
@ -757,52 +752,3 @@ func (ks *MesosScheduler) recoverTasks() error {
} }
return nil return nil
} }
// Create creates a scheduler plugin and all supporting background functions.
func (k *MesosScheduler) NewDefaultSchedulerLoopConfig(terminate <-chan struct{}, mux *http.ServeMux) *operations.SchedulerLoopConfig {
// use ListWatch watching pods using the client by default
lw := cache.NewListWatchFromClient(k.client, "pods", api.NamespaceAll, fields.Everything())
return k.NewSchedulerLoopConfig(terminate, mux, lw)
}
func (k *MesosScheduler) NewSchedulerLoopConfig(terminate <-chan struct{}, mux *http.ServeMux,
podsWatcher *cache.ListWatch) *operations.SchedulerLoopConfig {
// Watch and queue pods that need scheduling.
updates := make(chan queue.Entry, k.schedulerConfig.UpdatesBacklog)
podUpdates := &podStoreAdapter{queue.NewHistorical(updates)}
reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0)
// lock that guards critial sections that involve transferring pods from
// the store (cache) to the scheduling queue; its purpose is to maintain
// an ordering (vs interleaving) of operations that's easier to reason about.
scheduler := &mesosFramework{mesosScheduler: k}
q := queuer.New(podUpdates)
podDeleter := operations.NewDeleter(scheduler, q)
podReconciler := operations.NewPodReconciler(scheduler, k.client, q, podDeleter)
bo := backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration)
eh := operations.NewErrorHandler(scheduler, bo, q)
startLatch := make(chan struct{})
eventBroadcaster := record.NewBroadcaster()
runtime.On(startLatch, func() {
eventBroadcaster.StartRecordingToSink(k.client.Events(""))
reflector.Run() // TODO(jdef) should listen for termination
podDeleter.Run(updates, terminate)
q.Run(terminate)
q.InstallDebugHandlers(mux)
podtask.InstallDebugHandlers(k.taskRegistry, mux)
})
return &operations.SchedulerLoopConfig{
Algorithm: operations.NewSchedulerAlgorithm(scheduler, podUpdates),
Binder: operations.NewBinder(scheduler),
NextPod: q.Yield,
Error: eh.Error,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
Fw: scheduler,
Client: k.client,
Qr: q,
Pr: podReconciler,
Starting: startLatch,
}
}

View File

@ -59,6 +59,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
@ -73,7 +74,6 @@ import (
"k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
) )
const ( const (
@ -719,7 +719,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
} }
fcfs := podschedulers.NewFCFSPodScheduler(as, lookupNode) fcfs := podschedulers.NewFCFSPodScheduler(as, lookupNode)
mesosPodScheduler := scheduler.New(scheduler.Config{ mesosScheduler := scheduler.New(scheduler.Config{
SchedulerConfig: *sc, SchedulerConfig: *sc,
Executor: executor, Executor: executor,
PodScheduler: fcfs, PodScheduler: fcfs,
@ -743,7 +743,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Fatalf("Misconfigured mesos framework: %v", err) log.Fatalf("Misconfigured mesos framework: %v", err)
} }
schedulerProcess := ha.New(mesosPodScheduler) schedulerProcess := ha.New(mesosScheduler)
dconfig := &bindings.DriverConfig{ dconfig := &bindings.DriverConfig{
Scheduler: schedulerProcess, Scheduler: schedulerProcess,
Framework: info, Framework: info,
@ -759,13 +759,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
}, },
} }
loop := operations.NewSchedulerLoop(mesosPodScheduler.NewDefaultSchedulerLoopConfig(schedulerProcess.Terminal(), s.mux)) fw := &scheduler.MesosFramework{MesosScheduler: mesosScheduler}
runtime.On(mesosPodScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) }) loop := operations.NewSchedulerLoop(operations.NewDefaultSchedulerLoopConfig(sc, fw, client, schedulerProcess.Terminal(), s.mux))
runtime.On(mesosPodScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal())) runtime.On(mesosScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) })
runtime.On(mesosScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal()))
driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) { driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) {
log.V(1).Infoln("performing deferred initialization") log.V(1).Infoln("performing deferred initialization")
if err = mesosPodScheduler.Init(schedulerProcess.Master(), loop, s.mux); err != nil { if err = mesosScheduler.Init(schedulerProcess.Master(), loop, s.mux); err != nil {
return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err) return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err)
} }
log.V(1).Infoln("deferred init complete") log.V(1).Infoln("deferred init complete")