diff --git a/cmd/hyperkube/kube-scheduler.go b/cmd/hyperkube/kube-scheduler.go index f27a3f863d2..6c5faffff95 100644 --- a/cmd/hyperkube/kube-scheduler.go +++ b/cmd/hyperkube/kube-scheduler.go @@ -15,7 +15,7 @@ limitations under the License. */ // CAUTION: If you update code in this file, you may need to also update code -// in contrib/mesos/cmd/km/k8sm-scheduler.go +// in contrib/mesos/cmd/km/k8sm-mesos_scheduler.go package main import ( diff --git a/contrib/mesos/cmd/km/k8sm-scheduler.go b/contrib/mesos/cmd/km/k8sm-scheduler.go index 3904668f4d8..79b9b3fda37 100644 --- a/contrib/mesos/cmd/km/k8sm-scheduler.go +++ b/contrib/mesos/cmd/km/k8sm-scheduler.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -// clone of the upstream cmd/hypercube/k8sm-scheduler.go +// clone of the upstream cmd/hypercube/k8sm-mesos_scheduler.go package main import ( diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 7d1a078a442..0f4d365af62 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -475,7 +475,7 @@ func (k *KubernetesMesosExecutor) launchTask(driver bindings.ExecutorDriver, tas // TODO(k8s): use Pods interface for binding once clusters are upgraded // return b.Pods(binding.Namespace).Bind(binding) if pod.Spec.NodeName == "" { - //HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/scheduler.go + //HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/mesos_scheduler.go binding := &api.Binding{ ObjectMeta: api.ObjectMeta{ Namespace: pod.Namespace, @@ -780,7 +780,7 @@ func (k *KubernetesMesosExecutor) FrameworkMessage(driver bindings.ExecutorDrive } log.Infof("Receives message from framework %v\n", message) - //TODO(jdef) master reported a lost task, reconcile this! @see scheduler.go:handleTaskLost + //TODO(jdef) master reported a lost task, reconcile this! @see mesos_scheduler.go:handleTaskLost if strings.HasPrefix(message, messages.TaskLost+":") { taskId := message[len(messages.TaskLost)+1:] if taskId != "" { diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/mesos_scheduler.go similarity index 93% rename from contrib/mesos/pkg/scheduler/scheduler.go rename to contrib/mesos/pkg/scheduler/mesos_scheduler.go index d79b8eba980..236ad2136b9 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/mesos_scheduler.go @@ -66,7 +66,7 @@ type PluginInterface interface { // 1: A mesos scheduler. // 2: A kubernetes scheduler plugin. // 3: A kubernetes pod.Registry. -type KubernetesMesosScheduler struct { +type MesosScheduler struct { // We use a lock here to avoid races // between invoking the mesos callback // and the invoking the pod registry interfaces. @@ -122,9 +122,9 @@ type Config struct { } // New creates a new KubernetesScheduler -func New(config Config) *KubernetesMesosScheduler { - var k *KubernetesMesosScheduler - k = &KubernetesMesosScheduler{ +func New(config Config) *MesosScheduler { + var k *MesosScheduler + k = &MesosScheduler{ schedulerConfig: &config.SchedulerConfig, RWMutex: new(sync.RWMutex), executor: config.Executor, @@ -180,7 +180,7 @@ func New(config Config) *KubernetesMesosScheduler { return k } -func (k *KubernetesMesosScheduler) Init(electedMaster proc.Process, pl PluginInterface, mux *http.ServeMux) error { +func (k *MesosScheduler) Init(electedMaster proc.Process, pl PluginInterface, mux *http.ServeMux) error { log.V(1).Infoln("initializing kubernetes mesos scheduler") k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error { @@ -197,13 +197,13 @@ func (k *KubernetesMesosScheduler) Init(electedMaster proc.Process, pl PluginInt return k.recoverTasks() } -func (k *KubernetesMesosScheduler) asMaster() proc.Doer { +func (k *MesosScheduler) asMaster() proc.Doer { k.RLock() defer k.RUnlock() return k.asRegisteredMaster } -func (k *KubernetesMesosScheduler) InstallDebugHandlers(mux *http.ServeMux) { +func (k *MesosScheduler) InstallDebugHandlers(mux *http.ServeMux) { wrappedHandler := func(uri string, h http.Handler) { mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) { ch := make(chan struct{}) @@ -251,12 +251,12 @@ func (k *KubernetesMesosScheduler) InstallDebugHandlers(mux *http.ServeMux) { })) } -func (k *KubernetesMesosScheduler) Registration() <-chan struct{} { +func (k *MesosScheduler) Registration() <-chan struct{} { return k.registration } // Registered is called when the scheduler registered with the master successfully. -func (k *KubernetesMesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { +func (k *MesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid) k.driver = drv @@ -268,7 +268,7 @@ func (k *KubernetesMesosScheduler) Registered(drv bindings.SchedulerDriver, fid k.reconciler.RequestExplicit() } -func (k *KubernetesMesosScheduler) storeFrameworkId() { +func (k *MesosScheduler) storeFrameworkId() { // TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available _, err := k.etcdClient.Set(meta.FrameworkIDKey, k.frameworkId.GetValue(), uint64(k.failoverTimeout)) if err != nil { @@ -278,7 +278,7 @@ func (k *KubernetesMesosScheduler) storeFrameworkId() { // Reregistered is called when the scheduler re-registered with the master successfully. // This happends when the master fails over. -func (k *KubernetesMesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { +func (k *MesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { log.Infof("Scheduler reregistered with the master: %v\n", mi) k.driver = drv @@ -290,7 +290,7 @@ func (k *KubernetesMesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi } // perform one-time initialization actions upon the first registration event received from Mesos. -func (k *KubernetesMesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) { +func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) { defer close(k.registration) if k.failoverTimeout > 0 { @@ -316,7 +316,7 @@ func (k *KubernetesMesosScheduler) onInitialRegistration(driver bindings.Schedul } // Disconnected is called when the scheduler loses connection to the master. -func (k *KubernetesMesosScheduler) Disconnected(driver bindings.SchedulerDriver) { +func (k *MesosScheduler) Disconnected(driver bindings.SchedulerDriver) { log.Infof("Master disconnected!\n") k.registered = false @@ -326,7 +326,7 @@ func (k *KubernetesMesosScheduler) Disconnected(driver bindings.SchedulerDriver) } // ResourceOffers is called when the scheduler receives some offers from the master. -func (k *KubernetesMesosScheduler) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { +func (k *MesosScheduler) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { log.V(2).Infof("Received offers %+v", offers) // Record the offers in the global offer map as well as each slave's offer map. @@ -347,7 +347,7 @@ func (k *KubernetesMesosScheduler) ResourceOffers(driver bindings.SchedulerDrive } // OfferRescinded is called when the resources are recinded from the scheduler. -func (k *KubernetesMesosScheduler) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { +func (k *MesosScheduler) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { log.Infof("Offer rescinded %v\n", offerId) oid := offerId.GetValue() @@ -355,7 +355,7 @@ func (k *KubernetesMesosScheduler) OfferRescinded(driver bindings.SchedulerDrive } // StatusUpdate is called when a status update message is sent to the scheduler. -func (k *KubernetesMesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *MesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { source, reason := "none", "none" if taskStatus.Source != nil { @@ -431,7 +431,7 @@ func (k *KubernetesMesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, } } -func (k *KubernetesMesosScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *MesosScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { task, state := k.taskRegistry.UpdateStatus(taskStatus) if (state == podtask.StateRunning || state == podtask.StatePending) && @@ -471,7 +471,7 @@ func (k *KubernetesMesosScheduler) reconcileTerminalTask(driver bindings.Schedul } // reconcile an unknown (from the perspective of our registry) non-terminal task -func (k *KubernetesMesosScheduler) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *MesosScheduler) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { // attempt to recover task from pod info: // - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil // - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace @@ -543,13 +543,13 @@ func (k *KubernetesMesosScheduler) reconcileNonTerminalTask(driver bindings.Sche } // FrameworkMessage is called when the scheduler receives a message from the executor. -func (k *KubernetesMesosScheduler) FrameworkMessage(driver bindings.SchedulerDriver, +func (k *MesosScheduler) FrameworkMessage(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message) } // SlaveLost is called when some slave is lost. -func (k *KubernetesMesosScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { +func (k *MesosScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { log.Infof("Slave %v is lost\n", slaveId) sid := slaveId.GetValue() @@ -564,14 +564,14 @@ func (k *KubernetesMesosScheduler) SlaveLost(driver bindings.SchedulerDriver, sl } // ExecutorLost is called when some executor is lost. -func (k *KubernetesMesosScheduler) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { +func (k *MesosScheduler) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status) // TODO(yifan): Restart any unfinished tasks of the executor. } // Error is called when there is an unrecoverable error in the scheduler or scheduler driver. // The driver should have been aborted before this is invoked. -func (k *KubernetesMesosScheduler) Error(driver bindings.SchedulerDriver, message string) { +func (k *MesosScheduler) Error(driver bindings.SchedulerDriver, message string) { log.Fatalf("fatal scheduler error: %v\n", message) } @@ -591,7 +591,7 @@ func explicitTaskFilter(t *podtask.T) bool { // invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation // is cancelled. if any other errors occur the composite reconciler will attempt to complete the // sequence, reporting only the last generated error. -func (k *KubernetesMesosScheduler) makeCompositeReconciler(actions ...ReconcilerAction) ReconcilerAction { +func (k *MesosScheduler) makeCompositeReconciler(actions ...ReconcilerAction) ReconcilerAction { if x := len(actions); x == 0 { // programming error panic("no actions specified for composite reconciler") @@ -642,7 +642,7 @@ func (k *KubernetesMesosScheduler) makeCompositeReconciler(actions ...Reconciler // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks listed in the scheduler's internal taskRegistry. -func (k *KubernetesMesosScheduler) makeTaskRegistryReconciler() ReconcilerAction { +func (k *MesosScheduler) makeTaskRegistryReconciler() ReconcilerAction { return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { taskToSlave := make(map[string]string) for _, t := range k.taskRegistry.List(explicitTaskFilter) { @@ -656,7 +656,7 @@ func (k *KubernetesMesosScheduler) makeTaskRegistryReconciler() ReconcilerAction // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks identified by annotations in the Kubernetes pod registry. -func (k *KubernetesMesosScheduler) makePodRegistryReconciler() ReconcilerAction { +func (k *MesosScheduler) makePodRegistryReconciler() ReconcilerAction { return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) if err != nil { @@ -682,7 +682,7 @@ func (k *KubernetesMesosScheduler) makePodRegistryReconciler() ReconcilerAction } // execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/ -func (k *KubernetesMesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { +func (k *MesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { log.Info("explicit reconcile tasks") // tell mesos to send us the latest status updates for all the non-terminal tasks that we know about @@ -887,7 +887,7 @@ requestLoop: } // for } -func (ks *KubernetesMesosScheduler) recoverTasks() error { +func (ks *MesosScheduler) recoverTasks() error { podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) if err != nil { log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err) diff --git a/contrib/mesos/pkg/scheduler/scheduler_mock.go b/contrib/mesos/pkg/scheduler/mesos_scheduler_mock.go similarity index 100% rename from contrib/mesos/pkg/scheduler/scheduler_mock.go rename to contrib/mesos/pkg/scheduler/mesos_scheduler_mock.go diff --git a/contrib/mesos/pkg/scheduler/scheduler_test.go b/contrib/mesos/pkg/scheduler/mesos_scheduler_test.go similarity index 97% rename from contrib/mesos/pkg/scheduler/scheduler_test.go rename to contrib/mesos/pkg/scheduler/mesos_scheduler_test.go index 54bec10ee40..d71bbe17661 100644 --- a/contrib/mesos/pkg/scheduler/scheduler_test.go +++ b/contrib/mesos/pkg/scheduler/mesos_scheduler_test.go @@ -86,7 +86,7 @@ func TestResourceOffer_Add(t *testing.T) { assert := assert.New(t) registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)} - testScheduler := &KubernetesMesosScheduler{ + testScheduler := &MesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -131,7 +131,7 @@ func TestResourceOffer_Add(t *testing.T) { func TestResourceOffer_Add_Rescind(t *testing.T) { assert := assert.New(t) - testScheduler := &KubernetesMesosScheduler{ + testScheduler := &MesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -187,7 +187,7 @@ func TestSlave_Lost(t *testing.T) { assert := assert.New(t) // - testScheduler := &KubernetesMesosScheduler{ + testScheduler := &MesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -244,7 +244,7 @@ func TestDisconnect(t *testing.T) { assert := assert.New(t) // - testScheduler := &KubernetesMesosScheduler{ + testScheduler := &MesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -287,7 +287,7 @@ func TestStatus_Update(t *testing.T) { // setup expectations mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil) - testScheduler := &KubernetesMesosScheduler{ + testScheduler := &MesosScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index a2d87e5bdc7..ce7d49cc8d4 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -55,7 +55,7 @@ const ( type mesosSchedulerApiAdapter struct { sync.Mutex - mesosScheduler *KubernetesMesosScheduler + mesosScheduler *MesosScheduler } func (k *mesosSchedulerApiAdapter) Algorithm() malgorithm.PodScheduler { @@ -271,12 +271,12 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) } // Create creates a scheduler plugin and all supporting background functions. -func (k *KubernetesMesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig { +func (k *MesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig { // use ListWatch watching pods using the client by default return k.NewPluginConfig(terminate, mux, createAllPodsLW(k.client)) } -func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux, +func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *PluginConfig { // Watch and queue pods that need scheduling. @@ -360,7 +360,7 @@ func (s *schedulingPlugin) Run(done <-chan struct{}) { go runtime.Until(s.scheduleOne, pluginRecoveryDelay, done) } -// hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go, +// hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/mesos_scheduler.go, // with the Modeler stuff removed since we don't use it because we have mesos. func (s *schedulingPlugin) scheduleOne() { pod := s.config.NextPod() diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index d07a2efcb70..65dc97b5663 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -427,7 +427,7 @@ type lifecycleTest struct { eventObs *EventObserver plugin *schedulingPlugin podsListWatch *MockPodsListWatch - scheduler *KubernetesMesosScheduler + scheduler *MesosScheduler schedulerProc *ha.SchedulerProcess t *testing.T }