diff --git a/contrib/mesos/pkg/scheduler/api/types.go b/contrib/mesos/pkg/scheduler/api/types.go index a4cb11c645e..d4b13e1056f 100644 --- a/contrib/mesos/pkg/scheduler/api/types.go +++ b/contrib/mesos/pkg/scheduler/api/types.go @@ -26,7 +26,7 @@ import ( ) // scheduler abstraction to allow for easier unit testing -type SchedulerApi interface { +type Scheduler interface { sync.Locker // synchronize scheduler plugin operations podschedulers.SlaveIndex diff --git a/contrib/mesos/pkg/scheduler/mesos_scheduler.go b/contrib/mesos/pkg/scheduler/mesos_scheduler.go index a866f330966..c099305d835 100644 --- a/contrib/mesos/pkg/scheduler/mesos_scheduler.go +++ b/contrib/mesos/pkg/scheduler/mesos_scheduler.go @@ -37,11 +37,11 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/proc" "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" @@ -936,13 +936,13 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se // lock that guards critial sections that involve transferring pods from // the store (cache) to the scheduling queue; its purpose is to maintain // an ordering (vs interleaving) of operations that's easier to reason about. - kapi := &mesosSchedulerApiAdapter{mesosScheduler: k} + scheduler := &mesosSchedulerApiAdapter{mesosScheduler: k} q := queuer.New(podUpdates) - podDeleter := operations.NewDeleter(kapi, q) + podDeleter := operations.NewDeleter(scheduler, q) eh := &errorHandler{ - api: kapi, - backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration), - qr: q, + scheduler: scheduler, + backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration), + qr: q, } startLatch := make(chan struct{}) eventBroadcaster := record.NewBroadcaster() @@ -959,18 +959,18 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se Config: &plugin.Config{ NodeLister: nil, Algorithm: &schedulerApiAlgorithmAdapter{ - api: kapi, + scheduler: scheduler, podUpdates: podUpdates, }, - Binder: operations.NewBinder(kapi), + Binder: operations.NewBinder(scheduler), NextPod: q.Yield, Error: eh.handleSchedulingError, Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), }, - api: kapi, - client: k.client, - qr: q, - deleter: podDeleter, - starting: startLatch, + scheduler: scheduler, + client: k.client, + qr: q, + deleter: podDeleter, + starting: startLatch, } } diff --git a/contrib/mesos/pkg/scheduler/operations/binder.go b/contrib/mesos/pkg/scheduler/operations/binder.go index c94a04482e2..28691007911 100644 --- a/contrib/mesos/pkg/scheduler/operations/binder.go +++ b/contrib/mesos/pkg/scheduler/operations/binder.go @@ -29,12 +29,12 @@ import ( ) type Binder struct { - api schedapi.SchedulerApi + scheduler schedapi.Scheduler } -func NewBinder(api schedapi.SchedulerApi) *Binder { +func NewBinder(scheduler schedapi.Scheduler) *Binder { return &Binder{ - api: api, + scheduler: scheduler, } } @@ -49,10 +49,10 @@ func (b *Binder) Bind(binding *api.Binding) error { return err } - b.api.Lock() - defer b.api.Unlock() + b.scheduler.Lock() + defer b.scheduler.Unlock() - switch task, state := b.api.Tasks().ForPod(podKey); state { + switch task, state := b.scheduler.Tasks().ForPod(podKey); state { case podtask.StatePending: return b.bind(ctx, binding, task) default: @@ -66,7 +66,7 @@ func (b *Binder) Bind(binding *api.Binding) error { func (b *Binder) rollback(task *podtask.T, err error) error { task.Offer.Release() task.Reset() - if err2 := b.api.Tasks().Update(task); err2 != nil { + if err2 := b.scheduler.Tasks().Update(task); err2 != nil { log.Errorf("failed to update pod task: %v", err2) } return err @@ -88,7 +88,7 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e // By this time, there is a chance that the slave is disconnected. offerId := task.GetOfferId() - if offer, ok := b.api.Offers().Get(offerId); !ok || offer.HasExpired() { + if offer, ok := b.scheduler.Offers().Get(offerId); !ok || offer.HasExpired() { // already rescinded or timed out or otherwise invalidated return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID)) } @@ -96,10 +96,10 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB", task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory) - if err = b.api.LaunchTask(task); err == nil { - b.api.Offers().Invalidate(offerId) + if err = b.scheduler.LaunchTask(task); err == nil { + b.scheduler.Offers().Invalidate(offerId) task.Set(podtask.Launched) - if err = b.api.Tasks().Update(task); err != nil { + if err = b.scheduler.Tasks().Update(task); err != nil { // this should only happen if the task has been removed or has changed status, // which SHOULD NOT HAPPEN as long as we're synchronizing correctly log.Errorf("failed to update task w/ Launched status: %v", err) diff --git a/contrib/mesos/pkg/scheduler/operations/deleter.go b/contrib/mesos/pkg/scheduler/operations/deleter.go index eb5e954fc7b..adf73abb465 100644 --- a/contrib/mesos/pkg/scheduler/operations/deleter.go +++ b/contrib/mesos/pkg/scheduler/operations/deleter.go @@ -30,14 +30,14 @@ import ( ) type Deleter struct { - api schedapi.SchedulerApi - qr *queuer.Queuer + scheduler schedapi.Scheduler + qr *queuer.Queuer } -func NewDeleter(api schedapi.SchedulerApi, qr *queuer.Queuer) *Deleter { +func NewDeleter(scheduler schedapi.Scheduler, qr *queuer.Queuer) *Deleter { return &Deleter{ - api: api, - qr: qr, + scheduler: scheduler, + qr: qr, } } @@ -72,8 +72,8 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error { // removing the pod from the scheduling queue. this makes the concurrent // execution of scheduler-error-handling and delete-handling easier to // reason about. - k.api.Lock() - defer k.api.Unlock() + k.scheduler.Lock() + defer k.scheduler.Unlock() // prevent the scheduler from attempting to pop this; it's also possible that // it's concurrently being scheduled (somewhere between pod scheduling and @@ -81,7 +81,7 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error { // will abort Bind()ing k.qr.Dequeue(pod.GetUID()) - switch task, state := k.api.Tasks().ForPod(podKey); state { + switch task, state := k.scheduler.Tasks().ForPod(podKey); state { case podtask.StateUnknown: log.V(2).Infof("Could not resolve pod '%s' to task id", podKey) return merrors.NoSuchPodErr @@ -96,11 +96,11 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error { task.Reset() task.Set(podtask.Deleted) //TODO(jdef) probably want better handling here - if err := k.api.Tasks().Update(task); err != nil { + if err := k.scheduler.Tasks().Update(task); err != nil { return err } } - k.api.Tasks().Unregister(task) + k.scheduler.Tasks().Unregister(task) return nil } fallthrough @@ -108,10 +108,10 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error { case podtask.StateRunning: // signal to watchers that the related pod is going down task.Set(podtask.Deleted) - if err := k.api.Tasks().Update(task); err != nil { + if err := k.scheduler.Tasks().Update(task); err != nil { log.Errorf("failed to update task w/ Deleted status: %v", err) } - return k.api.KillTask(task.ID) + return k.scheduler.KillTask(task.ID) default: log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index ba3248e912c..f5203be7276 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -28,10 +28,10 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" @@ -102,7 +102,7 @@ func (k *mesosSchedulerApiAdapter) LaunchTask(task *podtask.T) error { // k8smSchedulingAlgorithm implements the algorithm.ScheduleAlgorithm interface type schedulerApiAlgorithmAdapter struct { - api schedapi.SchedulerApi + scheduler schedapi.Scheduler podUpdates queue.FIFO } @@ -118,10 +118,10 @@ func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.N return "", err } - k.api.Lock() - defer k.api.Unlock() + k.scheduler.Lock() + defer k.scheduler.Unlock() - switch task, state := k.api.Tasks().ForPod(podKey); state { + switch task, state := k.scheduler.Tasks().ForPod(podKey); state { case podtask.StateUnknown: // There's a bit of a potential race here, a pod could have been yielded() and // then before we get *here* it could be deleted. @@ -136,7 +136,7 @@ func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.N log.Infof("aborting Schedule, pod has been deleted %+v", pod) return "", merrors.NoSuchPodErr } - return k.doSchedule(k.api.Tasks().Register(k.api.CreatePodTask(ctx, pod))) + return k.doSchedule(k.scheduler.Tasks().Register(k.scheduler.CreatePodTask(ctx, pod))) //TODO(jdef) it's possible that the pod state has diverged from what //we knew previously, we should probably update the task.Pod state here @@ -166,19 +166,19 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s if task.HasAcceptedOffer() { // verify that the offer is still on the table offerId := task.GetOfferId() - if offer, ok := k.api.Offers().Get(offerId); ok && !offer.HasExpired() { + if offer, ok := k.scheduler.Offers().Get(offerId); ok && !offer.HasExpired() { // skip tasks that have already have assigned offers offer = task.Offer } else { task.Offer.Release() task.Reset() - if err = k.api.Tasks().Update(task); err != nil { + if err = k.scheduler.Tasks().Update(task); err != nil { return "", err } } } if err == nil && offer == nil { - offer, err = k.api.PodScheduler().SchedulePod(k.api.Offers(), k.api, task) + offer, err = k.scheduler.PodScheduler().SchedulePod(k.scheduler.Offers(), k.scheduler, task) } if err != nil { return "", err @@ -188,10 +188,10 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) } slaveId := details.GetSlaveId().GetValue() - if slaveHostName := k.api.SlaveHostNameFor(slaveId); slaveHostName == "" { + if slaveHostName := k.scheduler.SlaveHostNameFor(slaveId); slaveHostName == "" { // not much sense in Release()ing the offer here since its owner died offer.Release() - k.api.Offers().Invalidate(details.Id.GetValue()) + k.scheduler.Offers().Invalidate(details.Id.GetValue()) return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID) } else { if task.Offer != nil && task.Offer != offer { @@ -199,9 +199,9 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s } task.Offer = offer - k.api.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? + k.scheduler.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? - if err := k.api.Tasks().Update(task); err != nil { + if err := k.scheduler.Tasks().Update(task); err != nil { offer.Release() return "", err } @@ -210,9 +210,9 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s } type errorHandler struct { - api schedapi.SchedulerApi - backoff *backoff.Backoff - qr *queuer.Queuer + scheduler schedapi.Scheduler + backoff *backoff.Backoff + qr *queuer.Queuer } // implementation of scheduling plugin's Error func; see plugin/pkg/scheduler @@ -235,10 +235,10 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) } k.backoff.GC() - k.api.Lock() - defer k.api.Unlock() + k.scheduler.Lock() + defer k.scheduler.Unlock() - switch task, state := k.api.Tasks().ForPod(podKey); state { + switch task, state := k.scheduler.Tasks().ForPod(podKey); state { case podtask.StateUnknown: // if we don't have a mapping here any more then someone deleted the pod log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey) @@ -252,16 +252,16 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) breakoutEarly := queue.BreakChan(nil) if schedulingErr == podschedulers.NoSuitableOffersErr { log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) - breakoutEarly = queue.BreakChan(k.api.Offers().Listen(podKey, func(offer *mesos.Offer) bool { - k.api.Lock() - defer k.api.Unlock() - switch task, state := k.api.Tasks().Get(task.ID); state { + breakoutEarly = queue.BreakChan(k.scheduler.Offers().Listen(podKey, func(offer *mesos.Offer) bool { + k.scheduler.Lock() + defer k.scheduler.Unlock() + switch task, state := k.scheduler.Tasks().Get(task.ID); state { case podtask.StatePending: // Assess fitness of pod with the current offer. The scheduler normally // "backs off" when it can't find an offer that matches up with a pod. // The backoff period for a pod can terminate sooner if an offer becomes // available that matches up. - return !task.Has(podtask.Launched) && k.api.PodScheduler().FitPredicate()(task, offer, nil) + return !task.Has(podtask.Launched) && k.scheduler.PodScheduler().FitPredicate()(task, offer, nil) default: // no point in continuing to check for matching offers return true @@ -279,31 +279,31 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) type PluginConfig struct { *plugin.Config - api schedapi.SchedulerApi - client *client.Client - qr *queuer.Queuer - deleter *operations.Deleter - starting chan struct{} // startup latch + scheduler schedapi.Scheduler + client *client.Client + qr *queuer.Queuer + deleter *operations.Deleter + starting chan struct{} // startup latch } func NewPlugin(c *PluginConfig) PluginInterface { return &schedulerPlugin{ - config: c.Config, - api: c.api, - client: c.client, - qr: c.qr, - deleter: c.deleter, - starting: c.starting, + config: c.Config, + scheduler: c.scheduler, + client: c.client, + qr: c.qr, + deleter: c.deleter, + starting: c.starting, } } type schedulerPlugin struct { - config *plugin.Config - api schedapi.SchedulerApi - client *client.Client - qr *queuer.Queuer - deleter *operations.Deleter - starting chan struct{} + config *plugin.Config + scheduler schedapi.Scheduler + client *client.Client + qr *queuer.Queuer + deleter *operations.Deleter + starting chan struct{} } func (s *schedulerPlugin) Run(done <-chan struct{}) { @@ -391,10 +391,10 @@ func (s *schedulerPlugin) reconcileTask(t *podtask.T) { return } - s.api.Lock() - defer s.api.Unlock() + s.scheduler.Lock() + defer s.scheduler.Unlock() - if _, state := s.api.Tasks().ForPod(podKey); state != podtask.StateUnknown { + if _, state := s.scheduler.Tasks().ForPod(podKey); state != podtask.StateUnknown { //TODO(jdef) reconcile the task log.Errorf("task already registered for pod %v", pod.Name) return diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 058bfbc7c18..b85f9026ba4 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -41,10 +41,10 @@ import ( "github.com/stretchr/testify/mock" assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/pkg/util" @@ -809,7 +809,7 @@ func TestPlugin_LifeCycle(t *testing.T) { podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { - t, _ := lt.plugin.api.Tasks().ForPod(podKey) + t, _ := lt.plugin.scheduler.Tasks().ForPod(podKey) return t == nil })