diff --git a/contrib/mesos/pkg/scheduler/fcfs.go b/contrib/mesos/pkg/scheduler/algorithm/fcfs.go similarity index 98% rename from contrib/mesos/pkg/scheduler/fcfs.go rename to contrib/mesos/pkg/scheduler/algorithm/fcfs.go index 9b9c93a5146..f81a8e47513 100644 --- a/contrib/mesos/pkg/scheduler/fcfs.go +++ b/contrib/mesos/pkg/scheduler/algorithm/fcfs.go @@ -101,5 +101,5 @@ func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, t return nil, err } log.V(2).Infof("failed to find a fit for pod: %s", podName) - return nil, noSuitableOffersErr + return nil, NoSuitableOffersErr } diff --git a/contrib/mesos/pkg/scheduler/types.go b/contrib/mesos/pkg/scheduler/algorithm/types.go similarity index 89% rename from contrib/mesos/pkg/scheduler/types.go rename to contrib/mesos/pkg/scheduler/algorithm/types.go index 36d4cc8fa57..99f3a7e8b2c 100644 --- a/contrib/mesos/pkg/scheduler/types.go +++ b/contrib/mesos/pkg/scheduler/algorithm/types.go @@ -51,11 +51,9 @@ type PodScheduler interface { type empty struct{} var ( - noSuitableOffersErr = errors.New("No suitable offers for pod/task") - noSuchPodErr = errors.New("No such pod exists") - noSuchTaskErr = errors.New("No such task exists") + NoSuitableOffersErr = errors.New("No suitable offers for pod/task") ) type SlaveIndex interface { - slaveHostNameFor(id string) string + SlaveHostNameFor(id string) string } diff --git a/contrib/mesos/pkg/scheduler/mock_test.go b/contrib/mesos/pkg/scheduler/mock_test.go index 163f96bd07b..93a7ed90549 100644 --- a/contrib/mesos/pkg/scheduler/mock_test.go +++ b/contrib/mesos/pkg/scheduler/mock_test.go @@ -23,6 +23,7 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" "github.com/stretchr/testify/mock" "k8s.io/kubernetes/contrib/mesos/pkg/offers" + malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/pkg/api" ) @@ -33,7 +34,7 @@ type MockScheduler struct { mock.Mock } -func (m *MockScheduler) slaveHostNameFor(id string) (hostName string) { +func (m *MockScheduler) SlaveHostNameFor(id string) (hostName string) { args := m.Called(id) x := args.Get(0) if x != nil { @@ -41,11 +42,11 @@ func (m *MockScheduler) slaveHostNameFor(id string) (hostName string) { } return } -func (m *MockScheduler) algorithm() (f PodScheduler) { +func (m *MockScheduler) algorithm() (f malgorithm.PodScheduler) { args := m.Called() x := args.Get(0) if x != nil { - f = x.(PodScheduler) + f = x.(malgorithm.PodScheduler) } return } @@ -86,8 +87,8 @@ func (m *MockScheduler) launchTask(task *podtask.T) error { // @deprecated this is a placeholder for me to test the mock package func TestNoSlavesYet(t *testing.T) { obj := &MockScheduler{} - obj.On("slaveHostNameFor", "foo").Return(nil) - obj.slaveHostNameFor("foo") + obj.On("SlaveHostNameFor", "foo").Return(nil) + obj.SlaveHostNameFor("foo") obj.AssertExpectations(t) } diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 1a3ac9cdafd..507c04e7dea 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "errors" "fmt" "net/http" "sync" @@ -29,10 +30,11 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" + apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -44,19 +46,22 @@ import ( const ( pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling -) -const ( FailedScheduling = "FailedScheduling" Scheduled = "Scheduled" ) +var ( + noSuchPodErr = errors.New("No such pod exists") + noSuchTaskErr = errors.New("No such task exists") +) + // scheduler abstraction to allow for easier unit testing type schedulerInterface interface { sync.Locker // synchronize scheduler plugin operations - SlaveIndex - algorithm() PodScheduler + malgorithm.SlaveIndex + algorithm() malgorithm.PodScheduler offers() offers.Registry tasks() podtask.Registry @@ -75,7 +80,7 @@ type k8smScheduler struct { internal *KubernetesMesosScheduler } -func (k *k8smScheduler) algorithm() PodScheduler { +func (k *k8smScheduler) algorithm() malgorithm.PodScheduler { return k.internal } @@ -91,7 +96,7 @@ func (k *k8smScheduler) createPodTask(ctx api.Context, pod *api.Pod) (*podtask.T return podtask.New(ctx, "", *pod, k.internal.executor) } -func (k *k8smScheduler) slaveHostNameFor(id string) string { +func (k *k8smScheduler) SlaveHostNameFor(id string) string { return k.internal.slaveHostNames.HostName(id) } @@ -197,7 +202,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) } slaveId := details.GetSlaveId().GetValue() - if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" { + if slaveHostName := k.api.SlaveHostNameFor(slaveId); slaveHostName == "" { // not much sense in Release()ing the offer here since its owner died offer.Release() k.api.offers().Invalidate(details.Id.GetValue()) @@ -259,7 +264,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) return } breakoutEarly := queue.BreakChan(nil) - if schedulingErr == noSuitableOffersErr { + if schedulingErr == malgorithm.NoSuitableOffersErr { log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) breakoutEarly = queue.BreakChan(k.api.offers().Listen(podKey, func(offer *mesos.Offer) bool { k.api.Lock() @@ -433,7 +438,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) { ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace) pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name) if err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { // attempt to delete if err = s.deleter.deleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr { log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err) diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index be29224cca8..9ca9f93d0c4 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -42,6 +42,7 @@ import ( assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/queue" + malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" @@ -463,7 +464,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest { ei.Data = []byte{0, 1, 2} // create scheduler - strategy := NewAllocationStrategy( + strategy := malgorithm.NewAllocationStrategy( podtask.NewDefaultPredicate( mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit, @@ -480,7 +481,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest { Host: apiServer.server.URL, Version: testapi.Default.Version(), }), - PodScheduler: NewFCFSPodScheduler(strategy, apiServer.LookupNode), + PodScheduler: malgorithm.NewFCFSPodScheduler(strategy, apiServer.LookupNode), Schedcfg: *schedcfg.CreateDefaultConfig(), LookupNode: apiServer.LookupNode, }) diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 2f252c35b89..2c33793bba9 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -35,6 +35,7 @@ import ( offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/proc" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" @@ -71,7 +72,7 @@ type KubernetesMesosScheduler struct { // and the invoking the pod registry interfaces. // In particular, changes to podtask.T objects are currently guarded by this lock. *sync.RWMutex - PodScheduler + malgorithm.PodScheduler // Config related, write-once @@ -111,7 +112,7 @@ type KubernetesMesosScheduler struct { type Config struct { Schedcfg schedcfg.Config Executor *mesos.ExecutorInfo - PodScheduler PodScheduler + PodScheduler malgorithm.PodScheduler Client *client.Client EtcdClient tools.EtcdClient FailoverTimeout float64 diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index e11b9427fbf..c5de0e26510 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" + malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" @@ -681,7 +682,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config log.Fatalf("misconfigured etcd: %v", err) } - as := scheduler.NewAllocationStrategy( + as := malgorithm.NewAllocationStrategy( podtask.NewDefaultPredicate( s.DefaultContainerCPULimit, s.DefaultContainerMemLimit, @@ -694,7 +695,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config // downgrade allocation strategy if user disables "account-for-pod-resources" if !s.AccountForPodResources { - as = scheduler.NewAllocationStrategy( + as = malgorithm.NewAllocationStrategy( podtask.DefaultMinimalPredicate, podtask.DefaultMinimalProcurement) } @@ -716,7 +717,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config return n.(*api.Node) } - fcfs := scheduler.NewFCFSPodScheduler(as, lookupNode) + fcfs := malgorithm.NewFCFSPodScheduler(as, lookupNode) mesosPodScheduler := scheduler.New(scheduler.Config{ Schedcfg: *sc, Executor: executor,