Rename KubernetesScheduler -> KubernetesMesosScheduler

This commit is contained in:
Dr. Stefan Schimanski 2015-10-25 10:28:23 -07:00
parent 866a17d6d8
commit 01a97ebc14
4 changed files with 36 additions and 36 deletions

View File

@ -78,7 +78,7 @@ type schedulerInterface interface {
type k8smScheduler struct { type k8smScheduler struct {
sync.Mutex sync.Mutex
internal *KubernetesScheduler internal *KubernetesMesosScheduler
} }
func (k *k8smScheduler) algorithm() PodScheduler { func (k *k8smScheduler) algorithm() PodScheduler {
@ -652,12 +652,12 @@ func (k *deleter) deleteOne(pod *Pod) error {
} }
// Create creates a scheduler plugin and all supporting background functions. // Create creates a scheduler plugin and all supporting background functions.
func (k *KubernetesScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig { func (k *KubernetesMesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig {
// use ListWatch watching pods using the client by default // use ListWatch watching pods using the client by default
return k.NewPluginConfig(terminate, mux, createAllPodsLW(k.client)) return k.NewPluginConfig(terminate, mux, createAllPodsLW(k.client))
} }
func (k *KubernetesScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux, func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux,
podsWatcher *cache.ListWatch) *PluginConfig { podsWatcher *cache.ListWatch) *PluginConfig {
// Watch and queue pods that need scheduling. // Watch and queue pods that need scheduling.

View File

@ -440,7 +440,7 @@ type lifecycleTest struct {
eventObs *EventObserver eventObs *EventObserver
plugin *schedulingPlugin plugin *schedulingPlugin
podsListWatch *MockPodsListWatch podsListWatch *MockPodsListWatch
scheduler *KubernetesScheduler scheduler *KubernetesMesosScheduler
schedulerProc *ha.SchedulerProcess schedulerProc *ha.SchedulerProcess
t *testing.T t *testing.T
} }

View File

@ -65,7 +65,7 @@ type PluginInterface interface {
// 1: A mesos scheduler. // 1: A mesos scheduler.
// 2: A kubernetes scheduler plugin. // 2: A kubernetes scheduler plugin.
// 3: A kubernetes pod.Registry. // 3: A kubernetes pod.Registry.
type KubernetesScheduler struct { type KubernetesMesosScheduler struct {
// We use a lock here to avoid races // We use a lock here to avoid races
// between invoking the mesos callback // between invoking the mesos callback
// and the invoking the pod registry interfaces. // and the invoking the pod registry interfaces.
@ -121,9 +121,9 @@ type Config struct {
} }
// New creates a new KubernetesScheduler // New creates a new KubernetesScheduler
func New(config Config) *KubernetesScheduler { func New(config Config) *KubernetesMesosScheduler {
var k *KubernetesScheduler var k *KubernetesMesosScheduler
k = &KubernetesScheduler{ k = &KubernetesMesosScheduler{
schedcfg: &config.Schedcfg, schedcfg: &config.Schedcfg,
RWMutex: new(sync.RWMutex), RWMutex: new(sync.RWMutex),
executor: config.Executor, executor: config.Executor,
@ -179,7 +179,7 @@ func New(config Config) *KubernetesScheduler {
return k return k
} }
func (k *KubernetesScheduler) Init(electedMaster proc.Process, pl PluginInterface, mux *http.ServeMux) error { func (k *KubernetesMesosScheduler) Init(electedMaster proc.Process, pl PluginInterface, mux *http.ServeMux) error {
log.V(1).Infoln("initializing kubernetes mesos scheduler") log.V(1).Infoln("initializing kubernetes mesos scheduler")
k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error { k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error {
@ -196,13 +196,13 @@ func (k *KubernetesScheduler) Init(electedMaster proc.Process, pl PluginInterfac
return k.recoverTasks() return k.recoverTasks()
} }
func (k *KubernetesScheduler) asMaster() proc.Doer { func (k *KubernetesMesosScheduler) asMaster() proc.Doer {
k.RLock() k.RLock()
defer k.RUnlock() defer k.RUnlock()
return k.asRegisteredMaster return k.asRegisteredMaster
} }
func (k *KubernetesScheduler) InstallDebugHandlers(mux *http.ServeMux) { func (k *KubernetesMesosScheduler) InstallDebugHandlers(mux *http.ServeMux) {
wrappedHandler := func(uri string, h http.Handler) { wrappedHandler := func(uri string, h http.Handler) {
mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) {
ch := make(chan struct{}) ch := make(chan struct{})
@ -250,12 +250,12 @@ func (k *KubernetesScheduler) InstallDebugHandlers(mux *http.ServeMux) {
})) }))
} }
func (k *KubernetesScheduler) Registration() <-chan struct{} { func (k *KubernetesMesosScheduler) Registration() <-chan struct{} {
return k.registration return k.registration
} }
// Registered is called when the scheduler registered with the master successfully. // Registered is called when the scheduler registered with the master successfully.
func (k *KubernetesScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { func (k *KubernetesMesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) {
log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid) log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid)
k.driver = drv k.driver = drv
@ -267,7 +267,7 @@ func (k *KubernetesScheduler) Registered(drv bindings.SchedulerDriver, fid *meso
k.reconciler.RequestExplicit() k.reconciler.RequestExplicit()
} }
func (k *KubernetesScheduler) storeFrameworkId() { func (k *KubernetesMesosScheduler) storeFrameworkId() {
// TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available // TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available
_, err := k.etcdClient.Set(meta.FrameworkIDKey, k.frameworkId.GetValue(), uint64(k.failoverTimeout)) _, err := k.etcdClient.Set(meta.FrameworkIDKey, k.frameworkId.GetValue(), uint64(k.failoverTimeout))
if err != nil { if err != nil {
@ -277,7 +277,7 @@ func (k *KubernetesScheduler) storeFrameworkId() {
// Reregistered is called when the scheduler re-registered with the master successfully. // Reregistered is called when the scheduler re-registered with the master successfully.
// This happends when the master fails over. // This happends when the master fails over.
func (k *KubernetesScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { func (k *KubernetesMesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) {
log.Infof("Scheduler reregistered with the master: %v\n", mi) log.Infof("Scheduler reregistered with the master: %v\n", mi)
k.driver = drv k.driver = drv
@ -289,7 +289,7 @@ func (k *KubernetesScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mes
} }
// perform one-time initialization actions upon the first registration event received from Mesos. // perform one-time initialization actions upon the first registration event received from Mesos.
func (k *KubernetesScheduler) onInitialRegistration(driver bindings.SchedulerDriver) { func (k *KubernetesMesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) {
defer close(k.registration) defer close(k.registration)
if k.failoverTimeout > 0 { if k.failoverTimeout > 0 {
@ -315,7 +315,7 @@ func (k *KubernetesScheduler) onInitialRegistration(driver bindings.SchedulerDri
} }
// Disconnected is called when the scheduler loses connection to the master. // Disconnected is called when the scheduler loses connection to the master.
func (k *KubernetesScheduler) Disconnected(driver bindings.SchedulerDriver) { func (k *KubernetesMesosScheduler) Disconnected(driver bindings.SchedulerDriver) {
log.Infof("Master disconnected!\n") log.Infof("Master disconnected!\n")
k.registered = false k.registered = false
@ -325,7 +325,7 @@ func (k *KubernetesScheduler) Disconnected(driver bindings.SchedulerDriver) {
} }
// ResourceOffers is called when the scheduler receives some offers from the master. // ResourceOffers is called when the scheduler receives some offers from the master.
func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { func (k *KubernetesMesosScheduler) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) {
log.V(2).Infof("Received offers %+v", offers) log.V(2).Infof("Received offers %+v", offers)
// Record the offers in the global offer map as well as each slave's offer map. // Record the offers in the global offer map as well as each slave's offer map.
@ -346,7 +346,7 @@ func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, of
} }
// OfferRescinded is called when the resources are recinded from the scheduler. // OfferRescinded is called when the resources are recinded from the scheduler.
func (k *KubernetesScheduler) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { func (k *KubernetesMesosScheduler) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) {
log.Infof("Offer rescinded %v\n", offerId) log.Infof("Offer rescinded %v\n", offerId)
oid := offerId.GetValue() oid := offerId.GetValue()
@ -354,7 +354,7 @@ func (k *KubernetesScheduler) OfferRescinded(driver bindings.SchedulerDriver, of
} }
// StatusUpdate is called when a status update message is sent to the scheduler. // StatusUpdate is called when a status update message is sent to the scheduler.
func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { func (k *KubernetesMesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
source, reason := "none", "none" source, reason := "none", "none"
if taskStatus.Source != nil { if taskStatus.Source != nil {
@ -430,7 +430,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
} }
} }
func (k *KubernetesScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { func (k *KubernetesMesosScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
task, state := k.taskRegistry.UpdateStatus(taskStatus) task, state := k.taskRegistry.UpdateStatus(taskStatus)
if (state == podtask.StateRunning || state == podtask.StatePending) && if (state == podtask.StateRunning || state == podtask.StatePending) &&
@ -470,7 +470,7 @@ func (k *KubernetesScheduler) reconcileTerminalTask(driver bindings.SchedulerDri
} }
// reconcile an unknown (from the perspective of our registry) non-terminal task // reconcile an unknown (from the perspective of our registry) non-terminal task
func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { func (k *KubernetesMesosScheduler) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
// attempt to recover task from pod info: // attempt to recover task from pod info:
// - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil // - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil
// - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace // - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace
@ -542,13 +542,13 @@ func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.Scheduler
} }
// FrameworkMessage is called when the scheduler receives a message from the executor. // FrameworkMessage is called when the scheduler receives a message from the executor.
func (k *KubernetesScheduler) FrameworkMessage(driver bindings.SchedulerDriver, func (k *KubernetesMesosScheduler) FrameworkMessage(driver bindings.SchedulerDriver,
executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) {
log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message) log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message)
} }
// SlaveLost is called when some slave is lost. // SlaveLost is called when some slave is lost.
func (k *KubernetesScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { func (k *KubernetesMesosScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) {
log.Infof("Slave %v is lost\n", slaveId) log.Infof("Slave %v is lost\n", slaveId)
sid := slaveId.GetValue() sid := slaveId.GetValue()
@ -563,14 +563,14 @@ func (k *KubernetesScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId
} }
// ExecutorLost is called when some executor is lost. // ExecutorLost is called when some executor is lost.
func (k *KubernetesScheduler) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { func (k *KubernetesMesosScheduler) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) {
log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status) log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status)
// TODO(yifan): Restart any unfinished tasks of the executor. // TODO(yifan): Restart any unfinished tasks of the executor.
} }
// Error is called when there is an unrecoverable error in the scheduler or scheduler driver. // Error is called when there is an unrecoverable error in the scheduler or scheduler driver.
// The driver should have been aborted before this is invoked. // The driver should have been aborted before this is invoked.
func (k *KubernetesScheduler) Error(driver bindings.SchedulerDriver, message string) { func (k *KubernetesMesosScheduler) Error(driver bindings.SchedulerDriver, message string) {
log.Fatalf("fatal scheduler error: %v\n", message) log.Fatalf("fatal scheduler error: %v\n", message)
} }
@ -590,7 +590,7 @@ func explicitTaskFilter(t *podtask.T) bool {
// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation // invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation
// is cancelled. if any other errors occur the composite reconciler will attempt to complete the // is cancelled. if any other errors occur the composite reconciler will attempt to complete the
// sequence, reporting only the last generated error. // sequence, reporting only the last generated error.
func (k *KubernetesScheduler) makeCompositeReconciler(actions ...ReconcilerAction) ReconcilerAction { func (k *KubernetesMesosScheduler) makeCompositeReconciler(actions ...ReconcilerAction) ReconcilerAction {
if x := len(actions); x == 0 { if x := len(actions); x == 0 {
// programming error // programming error
panic("no actions specified for composite reconciler") panic("no actions specified for composite reconciler")
@ -641,7 +641,7 @@ func (k *KubernetesScheduler) makeCompositeReconciler(actions ...ReconcilerActio
// reconciler action factory, performs explicit task reconciliation for non-terminal // reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks listed in the scheduler's internal taskRegistry. // tasks listed in the scheduler's internal taskRegistry.
func (k *KubernetesScheduler) makeTaskRegistryReconciler() ReconcilerAction { func (k *KubernetesMesosScheduler) makeTaskRegistryReconciler() ReconcilerAction {
return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
taskToSlave := make(map[string]string) taskToSlave := make(map[string]string)
for _, t := range k.taskRegistry.List(explicitTaskFilter) { for _, t := range k.taskRegistry.List(explicitTaskFilter) {
@ -655,7 +655,7 @@ func (k *KubernetesScheduler) makeTaskRegistryReconciler() ReconcilerAction {
// reconciler action factory, performs explicit task reconciliation for non-terminal // reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks identified by annotations in the Kubernetes pod registry. // tasks identified by annotations in the Kubernetes pod registry.
func (k *KubernetesScheduler) makePodRegistryReconciler() ReconcilerAction { func (k *KubernetesMesosScheduler) makePodRegistryReconciler() ReconcilerAction {
return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil { if err != nil {
@ -681,7 +681,7 @@ func (k *KubernetesScheduler) makePodRegistryReconciler() ReconcilerAction {
} }
// execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/ // execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/
func (k *KubernetesScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { func (k *KubernetesMesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error {
log.Info("explicit reconcile tasks") log.Info("explicit reconcile tasks")
// tell mesos to send us the latest status updates for all the non-terminal tasks that we know about // tell mesos to send us the latest status updates for all the non-terminal tasks that we know about
@ -886,7 +886,7 @@ requestLoop:
} // for } // for
} }
func (ks *KubernetesScheduler) recoverTasks() error { func (ks *KubernetesMesosScheduler) recoverTasks() error {
podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil { if err != nil {
log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err) log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err)

View File

@ -86,7 +86,7 @@ func TestResourceOffer_Add(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)} registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)}
testScheduler := &KubernetesScheduler{ testScheduler := &KubernetesMesosScheduler{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true
@ -131,7 +131,7 @@ func TestResourceOffer_Add(t *testing.T) {
func TestResourceOffer_Add_Rescind(t *testing.T) { func TestResourceOffer_Add_Rescind(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
testScheduler := &KubernetesScheduler{ testScheduler := &KubernetesMesosScheduler{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true
@ -187,7 +187,7 @@ func TestSlave_Lost(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
// //
testScheduler := &KubernetesScheduler{ testScheduler := &KubernetesMesosScheduler{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true
@ -244,7 +244,7 @@ func TestDisconnect(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
// //
testScheduler := &KubernetesScheduler{ testScheduler := &KubernetesMesosScheduler{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true
@ -287,7 +287,7 @@ func TestStatus_Update(t *testing.T) {
// setup expectations // setup expectations
mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil) mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil)
testScheduler := &KubernetesScheduler{ testScheduler := &KubernetesMesosScheduler{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true