From 01a97ebc14faa2210e7d538f0363aca2f6c498e0 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sun, 25 Oct 2015 10:28:23 -0700 Subject: [PATCH] Rename KubernetesScheduler -> KubernetesMesosScheduler --- contrib/mesos/pkg/scheduler/plugin.go | 6 +-- contrib/mesos/pkg/scheduler/plugin_test.go | 2 +- contrib/mesos/pkg/scheduler/scheduler.go | 54 +++++++++---------- contrib/mesos/pkg/scheduler/scheduler_test.go | 10 ++-- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 1c8d01eede7..ed219e2645c 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -78,7 +78,7 @@ type schedulerInterface interface { type k8smScheduler struct { sync.Mutex - internal *KubernetesScheduler + internal *KubernetesMesosScheduler } func (k *k8smScheduler) algorithm() PodScheduler { @@ -652,12 +652,12 @@ func (k *deleter) deleteOne(pod *Pod) error { } // Create creates a scheduler plugin and all supporting background functions. -func (k *KubernetesScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig { +func (k *KubernetesMesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig { // use ListWatch watching pods using the client by default return k.NewPluginConfig(terminate, mux, createAllPodsLW(k.client)) } -func (k *KubernetesScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux, +func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *PluginConfig { // Watch and queue pods that need scheduling. diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index f641a9c2e67..731f65787b7 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -440,7 +440,7 @@ type lifecycleTest struct { eventObs *EventObserver plugin *schedulingPlugin podsListWatch *MockPodsListWatch - scheduler *KubernetesScheduler + scheduler *KubernetesMesosScheduler schedulerProc *ha.SchedulerProcess t *testing.T } diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 55e6d7bfa6d..67fdef3c945 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -65,7 +65,7 @@ type PluginInterface interface { // 1: A mesos scheduler. // 2: A kubernetes scheduler plugin. // 3: A kubernetes pod.Registry. -type KubernetesScheduler struct { +type KubernetesMesosScheduler struct { // We use a lock here to avoid races // between invoking the mesos callback // and the invoking the pod registry interfaces. @@ -121,9 +121,9 @@ type Config struct { } // New creates a new KubernetesScheduler -func New(config Config) *KubernetesScheduler { - var k *KubernetesScheduler - k = &KubernetesScheduler{ +func New(config Config) *KubernetesMesosScheduler { + var k *KubernetesMesosScheduler + k = &KubernetesMesosScheduler{ schedcfg: &config.Schedcfg, RWMutex: new(sync.RWMutex), executor: config.Executor, @@ -179,7 +179,7 @@ func New(config Config) *KubernetesScheduler { return k } -func (k *KubernetesScheduler) Init(electedMaster proc.Process, pl PluginInterface, mux *http.ServeMux) error { +func (k *KubernetesMesosScheduler) Init(electedMaster proc.Process, pl PluginInterface, mux *http.ServeMux) error { log.V(1).Infoln("initializing kubernetes mesos scheduler") k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error { @@ -196,13 +196,13 @@ func (k *KubernetesScheduler) Init(electedMaster proc.Process, pl PluginInterfac return k.recoverTasks() } -func (k *KubernetesScheduler) asMaster() proc.Doer { +func (k *KubernetesMesosScheduler) asMaster() proc.Doer { k.RLock() defer k.RUnlock() return k.asRegisteredMaster } -func (k *KubernetesScheduler) InstallDebugHandlers(mux *http.ServeMux) { +func (k *KubernetesMesosScheduler) InstallDebugHandlers(mux *http.ServeMux) { wrappedHandler := func(uri string, h http.Handler) { mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) { ch := make(chan struct{}) @@ -250,12 +250,12 @@ func (k *KubernetesScheduler) InstallDebugHandlers(mux *http.ServeMux) { })) } -func (k *KubernetesScheduler) Registration() <-chan struct{} { +func (k *KubernetesMesosScheduler) Registration() <-chan struct{} { return k.registration } // Registered is called when the scheduler registered with the master successfully. -func (k *KubernetesScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { +func (k *KubernetesMesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid) k.driver = drv @@ -267,7 +267,7 @@ func (k *KubernetesScheduler) Registered(drv bindings.SchedulerDriver, fid *meso k.reconciler.RequestExplicit() } -func (k *KubernetesScheduler) storeFrameworkId() { +func (k *KubernetesMesosScheduler) storeFrameworkId() { // TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available _, err := k.etcdClient.Set(meta.FrameworkIDKey, k.frameworkId.GetValue(), uint64(k.failoverTimeout)) if err != nil { @@ -277,7 +277,7 @@ func (k *KubernetesScheduler) storeFrameworkId() { // Reregistered is called when the scheduler re-registered with the master successfully. // This happends when the master fails over. -func (k *KubernetesScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { +func (k *KubernetesMesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { log.Infof("Scheduler reregistered with the master: %v\n", mi) k.driver = drv @@ -289,7 +289,7 @@ func (k *KubernetesScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mes } // perform one-time initialization actions upon the first registration event received from Mesos. -func (k *KubernetesScheduler) onInitialRegistration(driver bindings.SchedulerDriver) { +func (k *KubernetesMesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) { defer close(k.registration) if k.failoverTimeout > 0 { @@ -315,7 +315,7 @@ func (k *KubernetesScheduler) onInitialRegistration(driver bindings.SchedulerDri } // Disconnected is called when the scheduler loses connection to the master. -func (k *KubernetesScheduler) Disconnected(driver bindings.SchedulerDriver) { +func (k *KubernetesMesosScheduler) Disconnected(driver bindings.SchedulerDriver) { log.Infof("Master disconnected!\n") k.registered = false @@ -325,7 +325,7 @@ func (k *KubernetesScheduler) Disconnected(driver bindings.SchedulerDriver) { } // ResourceOffers is called when the scheduler receives some offers from the master. -func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { +func (k *KubernetesMesosScheduler) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { log.V(2).Infof("Received offers %+v", offers) // Record the offers in the global offer map as well as each slave's offer map. @@ -346,7 +346,7 @@ func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, of } // OfferRescinded is called when the resources are recinded from the scheduler. -func (k *KubernetesScheduler) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { +func (k *KubernetesMesosScheduler) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { log.Infof("Offer rescinded %v\n", offerId) oid := offerId.GetValue() @@ -354,7 +354,7 @@ func (k *KubernetesScheduler) OfferRescinded(driver bindings.SchedulerDriver, of } // StatusUpdate is called when a status update message is sent to the scheduler. -func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *KubernetesMesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { source, reason := "none", "none" if taskStatus.Source != nil { @@ -430,7 +430,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task } } -func (k *KubernetesScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *KubernetesMesosScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { task, state := k.taskRegistry.UpdateStatus(taskStatus) if (state == podtask.StateRunning || state == podtask.StatePending) && @@ -470,7 +470,7 @@ func (k *KubernetesScheduler) reconcileTerminalTask(driver bindings.SchedulerDri } // reconcile an unknown (from the perspective of our registry) non-terminal task -func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *KubernetesMesosScheduler) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { // attempt to recover task from pod info: // - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil // - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace @@ -542,13 +542,13 @@ func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.Scheduler } // FrameworkMessage is called when the scheduler receives a message from the executor. -func (k *KubernetesScheduler) FrameworkMessage(driver bindings.SchedulerDriver, +func (k *KubernetesMesosScheduler) FrameworkMessage(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message) } // SlaveLost is called when some slave is lost. -func (k *KubernetesScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { +func (k *KubernetesMesosScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { log.Infof("Slave %v is lost\n", slaveId) sid := slaveId.GetValue() @@ -563,14 +563,14 @@ func (k *KubernetesScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId } // ExecutorLost is called when some executor is lost. -func (k *KubernetesScheduler) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { +func (k *KubernetesMesosScheduler) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status) // TODO(yifan): Restart any unfinished tasks of the executor. } // Error is called when there is an unrecoverable error in the scheduler or scheduler driver. // The driver should have been aborted before this is invoked. -func (k *KubernetesScheduler) Error(driver bindings.SchedulerDriver, message string) { +func (k *KubernetesMesosScheduler) Error(driver bindings.SchedulerDriver, message string) { log.Fatalf("fatal scheduler error: %v\n", message) } @@ -590,7 +590,7 @@ func explicitTaskFilter(t *podtask.T) bool { // invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation // is cancelled. if any other errors occur the composite reconciler will attempt to complete the // sequence, reporting only the last generated error. -func (k *KubernetesScheduler) makeCompositeReconciler(actions ...ReconcilerAction) ReconcilerAction { +func (k *KubernetesMesosScheduler) makeCompositeReconciler(actions ...ReconcilerAction) ReconcilerAction { if x := len(actions); x == 0 { // programming error panic("no actions specified for composite reconciler") @@ -641,7 +641,7 @@ func (k *KubernetesScheduler) makeCompositeReconciler(actions ...ReconcilerActio // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks listed in the scheduler's internal taskRegistry. -func (k *KubernetesScheduler) makeTaskRegistryReconciler() ReconcilerAction { +func (k *KubernetesMesosScheduler) makeTaskRegistryReconciler() ReconcilerAction { return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { taskToSlave := make(map[string]string) for _, t := range k.taskRegistry.List(explicitTaskFilter) { @@ -655,7 +655,7 @@ func (k *KubernetesScheduler) makeTaskRegistryReconciler() ReconcilerAction { // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks identified by annotations in the Kubernetes pod registry. -func (k *KubernetesScheduler) makePodRegistryReconciler() ReconcilerAction { +func (k *KubernetesMesosScheduler) makePodRegistryReconciler() ReconcilerAction { return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) if err != nil { @@ -681,7 +681,7 @@ func (k *KubernetesScheduler) makePodRegistryReconciler() ReconcilerAction { } // execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/ -func (k *KubernetesScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { +func (k *KubernetesMesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { log.Info("explicit reconcile tasks") // tell mesos to send us the latest status updates for all the non-terminal tasks that we know about @@ -886,7 +886,7 @@ requestLoop: } // for } -func (ks *KubernetesScheduler) recoverTasks() error { +func (ks *KubernetesMesosScheduler) recoverTasks() error { podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) if err != nil { log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err) diff --git a/contrib/mesos/pkg/scheduler/scheduler_test.go b/contrib/mesos/pkg/scheduler/scheduler_test.go index d2312e1c899..54bec10ee40 100644 --- a/contrib/mesos/pkg/scheduler/scheduler_test.go +++ b/contrib/mesos/pkg/scheduler/scheduler_test.go @@ -86,7 +86,7 @@ func TestResourceOffer_Add(t *testing.T) { assert := assert.New(t) registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)} - testScheduler := &KubernetesScheduler{ + testScheduler := &KubernetesMesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -131,7 +131,7 @@ func TestResourceOffer_Add(t *testing.T) { func TestResourceOffer_Add_Rescind(t *testing.T) { assert := assert.New(t) - testScheduler := &KubernetesScheduler{ + testScheduler := &KubernetesMesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -187,7 +187,7 @@ func TestSlave_Lost(t *testing.T) { assert := assert.New(t) // - testScheduler := &KubernetesScheduler{ + testScheduler := &KubernetesMesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -244,7 +244,7 @@ func TestDisconnect(t *testing.T) { assert := assert.New(t) // - testScheduler := &KubernetesScheduler{ + testScheduler := &KubernetesMesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -287,7 +287,7 @@ func TestStatus_Update(t *testing.T) { // setup expectations mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil) - testScheduler := &KubernetesScheduler{ + testScheduler := &KubernetesMesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true