diff --git a/contrib/mesos/pkg/scheduler/mock_test.go b/contrib/mesos/pkg/scheduler/mock_test.go index 17930a86813..163f96bd07b 100644 --- a/contrib/mesos/pkg/scheduler/mock_test.go +++ b/contrib/mesos/pkg/scheduler/mock_test.go @@ -33,13 +33,12 @@ type MockScheduler struct { mock.Mock } -func (m *MockScheduler) slaveFor(id string) (slave *Slave, ok bool) { +func (m *MockScheduler) slaveHostNameFor(id string) (hostName string) { args := m.Called(id) x := args.Get(0) if x != nil { - slave = x.(*Slave) + hostName = x.(string) } - ok = args.Bool(1) return } func (m *MockScheduler) algorithm() (f PodScheduler) { @@ -87,8 +86,8 @@ func (m *MockScheduler) launchTask(task *podtask.T) error { // @deprecated this is a placeholder for me to test the mock package func TestNoSlavesYet(t *testing.T) { obj := &MockScheduler{} - obj.On("slaveFor", "foo").Return(nil, false) - obj.slaveFor("foo") + obj.On("slaveHostNameFor", "foo").Return(nil) + obj.slaveHostNameFor("foo") obj.AssertExpectations(t) } diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index faddf02ae69..4bb7ac770e5 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -92,9 +92,8 @@ func (k *k8smScheduler) createPodTask(ctx api.Context, pod *api.Pod) (*podtask.T return podtask.New(ctx, "", *pod, k.internal.executor) } -func (k *k8smScheduler) slaveFor(id string) (slave *Slave, ok bool) { - slave, ok = k.internal.slaves.getSlave(id) - return +func (k *k8smScheduler) slaveHostNameFor(id string) string { + return k.internal.slaveHostNames.HostName(id) } func (k *k8smScheduler) killTask(taskId string) error { @@ -326,7 +325,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) } slaveId := details.GetSlaveId().GetValue() - if slave, ok := k.api.slaveFor(slaveId); !ok { + if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" { // not much sense in Release()ing the offer here since its owner died offer.Release() k.api.offers().Invalidate(details.Id.GetValue()) @@ -343,7 +342,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { offer.Release() return "", err } - return slave.HostName, nil + return slaveHostName, nil } } diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index fc722a5370c..51c4dbed533 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -50,54 +51,6 @@ import ( "k8s.io/kubernetes/pkg/util/sets" ) -type Slave struct { - HostName string -} - -func newSlave(hostName string) *Slave { - return &Slave{ - HostName: hostName, - } -} - -type slaveStorage struct { - sync.Mutex - slaves map[string]*Slave // SlaveID => slave. -} - -func newSlaveStorage() *slaveStorage { - return &slaveStorage{ - slaves: make(map[string]*Slave), - } -} - -// Create a mapping between a slaveID and slave if not existing. -func (self *slaveStorage) checkAndAdd(slaveId, slaveHostname string) { - self.Lock() - defer self.Unlock() - _, exists := self.slaves[slaveId] - if !exists { - self.slaves[slaveId] = newSlave(slaveHostname) - } -} - -func (self *slaveStorage) getSlaveIds() []string { - self.Lock() - defer self.Unlock() - slaveIds := make([]string, 0, len(self.slaves)) - for slaveID := range self.slaves { - slaveIds = append(slaveIds, slaveID) - } - return slaveIds -} - -func (self *slaveStorage) getSlave(slaveId string) (*Slave, bool) { - self.Lock() - defer self.Unlock() - slave, exists := self.slaves[slaveId] - return slave, exists -} - type PluginInterface interface { // the apiserver may have a different state for the pod than we do // so reconcile our records, but only for this one pod @@ -138,7 +91,7 @@ type KubernetesScheduler struct { registration chan struct{} // signal chan that closes upon first successful registration onRegistration sync.Once offers offers.Registry - slaves *slaveStorage + slaveHostNames *slave.Registry // unsafe state, needs to be guarded @@ -205,7 +158,7 @@ func New(config Config) *KubernetesScheduler { TTL: config.Schedcfg.OfferTTL.Duration, ListenerDelay: config.Schedcfg.ListenerDelay.Duration, }), - slaves: newSlaveStorage(), + slaveHostNames: slave.NewRegistry(), taskRegistry: podtask.NewInMemoryRegistry(), reconcileCooldown: config.ReconcileCooldown, registration: make(chan struct{}), @@ -270,7 +223,7 @@ func (k *KubernetesScheduler) InstallDebugHandlers(mux *http.ServeMux) { requestReconciliation("/debug/actions/requestImplicit", k.reconciler.RequestImplicit) wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - slaves := k.slaves.getSlaveIds() + slaves := k.slaveHostNames.SlaveIDs() for _, slaveId := range slaves { _, err := k.driver.SendFrameworkMessage( k.executor.ExecutorId, @@ -368,7 +321,7 @@ func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, of k.offers.Add(offers) for _, offer := range offers { slaveId := offer.GetSlaveId().GetValue() - k.slaves.checkAndAdd(slaveId, offer.GetHostname()) + k.slaveHostNames.Register(slaveId, offer.GetHostname()) } } @@ -423,7 +376,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task } // else, we don't really care about FINISHED tasks that aren't registered return } - if _, exists := k.slaves.getSlave(taskStatus.GetSlaveId().GetValue()); !exists { + if hostName := k.slaveHostNames.HostName(taskStatus.GetSlaveId().GetValue()); hostName != "" { // a registered task has an update reported by a slave that we don't recognize. // this should never happen! So we don't reconcile it. log.Errorf("Ignore status %+v because the slave does not exist", taskStatus) @@ -920,7 +873,7 @@ func (ks *KubernetesScheduler) recoverTasks() error { recoverSlave := func(t *podtask.T) { slaveId := t.Spec.SlaveID - ks.slaves.checkAndAdd(slaveId, t.Offer.Host()) + ks.slaveHostNames.Register(slaveId, t.Offer.Host()) } for _, pod := range podList.Items { if t, ok, err := podtask.RecoverFrom(pod); err != nil { diff --git a/contrib/mesos/pkg/scheduler/scheduler_test.go b/contrib/mesos/pkg/scheduler/scheduler_test.go index 79beadb2be2..9d4e6b9d0d3 100644 --- a/contrib/mesos/pkg/scheduler/scheduler_test.go +++ b/contrib/mesos/pkg/scheduler/scheduler_test.go @@ -26,76 +26,9 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/proc" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" ) -// Check that same slave is only added once. -func TestSlaveStorage_checkAndAdd(t *testing.T) { - assert := assert.New(t) - - slaveStorage := newSlaveStorage() - assert.Equal(0, len(slaveStorage.slaves)) - - slaveId := "slave1" - slaveHostname := "slave1Hostname" - slaveStorage.checkAndAdd(slaveId, slaveHostname) - assert.Equal(1, len(slaveStorage.getSlaveIds())) - - slaveStorage.checkAndAdd(slaveId, slaveHostname) - assert.Equal(1, len(slaveStorage.getSlaveIds())) -} - -// Check that getSlave returns notExist for nonexisting slave. -func TestSlaveStorage_getSlave(t *testing.T) { - assert := assert.New(t) - - slaveStorage := newSlaveStorage() - assert.Equal(0, len(slaveStorage.slaves)) - - slaveId := "slave1" - slaveHostname := "slave1Hostname" - - _, exists := slaveStorage.getSlave(slaveId) - assert.Equal(false, exists) - - slaveStorage.checkAndAdd(slaveId, slaveHostname) - assert.Equal(1, len(slaveStorage.getSlaveIds())) - - _, exists = slaveStorage.getSlave(slaveId) - assert.Equal(true, exists) -} - -// Check that getSlaveIds returns array with all slaveIds. -func TestSlaveStorage_getSlaveIds(t *testing.T) { - assert := assert.New(t) - - slaveStorage := newSlaveStorage() - assert.Equal(0, len(slaveStorage.slaves)) - - slaveId := "1" - slaveHostname := "hn1" - slaveStorage.checkAndAdd(slaveId, slaveHostname) - assert.Equal(1, len(slaveStorage.getSlaveIds())) - - slaveId = "2" - slaveHostname = "hn2" - slaveStorage.checkAndAdd(slaveId, slaveHostname) - assert.Equal(2, len(slaveStorage.getSlaveIds())) - - slaveIds := slaveStorage.getSlaveIds() - - slaveIdsMap := make(map[string]bool, len(slaveIds)) - for _, s := range slaveIds { - slaveIdsMap[s] = true - } - - _, ok := slaveIdsMap["1"] - assert.Equal(ok, true) - - _, ok = slaveIdsMap["2"] - assert.Equal(ok, true) - -} - //get number of non-expired offers from offer registry func getNumberOffers(os offers.Registry) int { //walk offers and check it is stored in registry @@ -126,7 +59,7 @@ func TestResourceOffer_Add(t *testing.T) { TTL: schedcfg.DefaultOfferTTL, ListenerDelay: schedcfg.DefaultListenerDelay, }), - slaves: newSlaveStorage(), + slaveHostNames: slave.NewRegistry(), } hostname := "h1" @@ -137,7 +70,7 @@ func TestResourceOffer_Add(t *testing.T) { assert.Equal(1, getNumberOffers(testScheduler.offers)) //check slave hostname - assert.Equal(1, len(testScheduler.slaves.getSlaveIds())) + assert.Equal(1, len(testScheduler.slaveHostNames.SlaveIDs())) //add another offer hostname2 := "h2" @@ -149,7 +82,7 @@ func TestResourceOffer_Add(t *testing.T) { assert.Equal(2, getNumberOffers(testScheduler.offers)) //check slave hostnames - assert.Equal(2, len(testScheduler.slaves.getSlaveIds())) + assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) } //test adding of ressource offer, should be added to offer registry and slavesf @@ -169,7 +102,7 @@ func TestResourceOffer_Add_Rescind(t *testing.T) { TTL: schedcfg.DefaultOfferTTL, ListenerDelay: schedcfg.DefaultListenerDelay, }), - slaves: newSlaveStorage(), + slaveHostNames: slave.NewRegistry(), } hostname := "h1" @@ -181,7 +114,7 @@ func TestResourceOffer_Add_Rescind(t *testing.T) { assert.Equal(1, getNumberOffers(testScheduler.offers)) //check slave hostname - assert.Equal(1, len(testScheduler.slaves.getSlaveIds())) + assert.Equal(1, len(testScheduler.slaveHostNames.SlaveIDs())) //add another offer hostname2 := "h2" @@ -192,7 +125,7 @@ func TestResourceOffer_Add_Rescind(t *testing.T) { assert.Equal(2, getNumberOffers(testScheduler.offers)) //check slave hostnames - assert.Equal(2, len(testScheduler.slaves.getSlaveIds())) + assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) //next whether offers can be rescinded testScheduler.OfferRescinded(nil, offerID1) @@ -222,7 +155,7 @@ func TestSlave_Lost(t *testing.T) { TTL: schedcfg.DefaultOfferTTL, ListenerDelay: schedcfg.DefaultListenerDelay, }), - slaves: newSlaveStorage(), + slaveHostNames: slave.NewRegistry(), } hostname := "h1" @@ -241,7 +174,7 @@ func TestSlave_Lost(t *testing.T) { //test precondition assert.Equal(3, getNumberOffers(testScheduler.offers)) - assert.Equal(2, len(testScheduler.slaves.getSlaveIds())) + assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) //remove first slave testScheduler.SlaveLost(nil, util.NewSlaveID(hostname)) @@ -249,7 +182,7 @@ func TestSlave_Lost(t *testing.T) { //offers should be removed assert.Equal(1, getNumberOffers(testScheduler.offers)) //slave hostnames should still be all present - assert.Equal(2, len(testScheduler.slaves.getSlaveIds())) + assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) //remove second slave testScheduler.SlaveLost(nil, util.NewSlaveID(hostname2)) @@ -257,7 +190,7 @@ func TestSlave_Lost(t *testing.T) { //offers should be removed assert.Equal(0, getNumberOffers(testScheduler.offers)) //slave hostnames should still be all present - assert.Equal(2, len(testScheduler.slaves.getSlaveIds())) + assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) //try to remove non existing slave testScheduler.SlaveLost(nil, util.NewSlaveID("notExist")) @@ -279,7 +212,7 @@ func TestDisconnect(t *testing.T) { TTL: schedcfg.DefaultOfferTTL, ListenerDelay: schedcfg.DefaultListenerDelay, }), - slaves: newSlaveStorage(), + slaveHostNames: slave.NewRegistry(), } hostname := "h1" @@ -302,7 +235,7 @@ func TestDisconnect(t *testing.T) { //all offers should be removed assert.Equal(0, getNumberOffers(testScheduler.offers)) //slave hostnames should still be all present - assert.Equal(2, len(testScheduler.slaves.getSlaveIds())) + assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) } //test we can handle different status updates, TODO check state transitions @@ -322,9 +255,9 @@ func TestStatus_Update(t *testing.T) { TTL: schedcfg.DefaultOfferTTL, ListenerDelay: schedcfg.DefaultListenerDelay, }), - slaves: newSlaveStorage(), - driver: &mockdriver, - taskRegistry: podtask.NewInMemoryRegistry(), + slaveHostNames: slave.NewRegistry(), + driver: &mockdriver, + taskRegistry: podtask.NewInMemoryRegistry(), } taskStatus_task_starting := util.NewTaskStatus( diff --git a/contrib/mesos/pkg/scheduler/slave/doc.go b/contrib/mesos/pkg/scheduler/slave/doc.go new file mode 100644 index 00000000000..0ea8c988ad2 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/slave/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package slave manages node hostnames for slave ids. +package slave diff --git a/contrib/mesos/pkg/scheduler/slave/slave.go b/contrib/mesos/pkg/scheduler/slave/slave.go new file mode 100644 index 00000000000..6999f8315b9 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/slave/slave.go @@ -0,0 +1,60 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slave + +import ( + "sync" +) + +type Registry struct { + lock sync.Mutex + hostNames map[string]string +} + +func NewRegistry() *Registry { + return &Registry{ + hostNames: map[string]string{}, + } +} + +// Register creates a mapping between a slaveId and slave if not existing. +func (st *Registry) Register(slaveId, slaveHostname string) { + st.lock.Lock() + defer st.lock.Unlock() + _, exists := st.hostNames[slaveId] + if !exists { + st.hostNames[slaveId] = slaveHostname + } +} + +// SlaveIDs returns the keys of the registry +func (st *Registry) SlaveIDs() []string { + st.lock.Lock() + defer st.lock.Unlock() + slaveIds := make([]string, 0, len(st.hostNames)) + for slaveID := range st.hostNames { + slaveIds = append(slaveIds, slaveID) + } + return slaveIds +} + +// HostName looks up a hostname for a given slaveId +func (st *Registry) HostName(slaveId string) string { + st.lock.Lock() + defer st.lock.Unlock() + return st.hostNames[slaveId] +} diff --git a/contrib/mesos/pkg/scheduler/slave/slave_test.go b/contrib/mesos/pkg/scheduler/slave/slave_test.go new file mode 100644 index 00000000000..67e8df9765a --- /dev/null +++ b/contrib/mesos/pkg/scheduler/slave/slave_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slave + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// Check that same slave is only added once. +func TestSlaveStorage_Register(t *testing.T) { + assert := assert.New(t) + + slaveStorage := NewRegistry() + assert.Equal(0, len(slaveStorage.hostNames)) + + slaveId := "slave1" + slaveHostname := "slave1Hostname" + slaveStorage.Register(slaveId, slaveHostname) + assert.Equal(1, len(slaveStorage.SlaveIDs())) + + slaveStorage.Register(slaveId, slaveHostname) + assert.Equal(1, len(slaveStorage.SlaveIDs())) +} + +// Check that GetHostName returns empty hostname for nonexisting slave. +func TestSlaveStorage_HostName(t *testing.T) { + assert := assert.New(t) + + slaveStorage := NewRegistry() + assert.Equal(0, len(slaveStorage.hostNames)) + + slaveId := "slave1" + slaveHostname := "slave1Hostname" + + h := slaveStorage.HostName(slaveId) + assert.Equal(h, "") + + slaveStorage.Register(slaveId, slaveHostname) + assert.Equal(1, len(slaveStorage.SlaveIDs())) + + h = slaveStorage.HostName(slaveId) + assert.Equal(h, slaveHostname) +} + +// Check that GetSlaveIds returns array with all slaveIds. +func TestSlaveStorage_SlaveIds(t *testing.T) { + assert := assert.New(t) + + slaveStorage := NewRegistry() + assert.Equal(0, len(slaveStorage.hostNames)) + + slaveId := "1" + slaveHostname := "hn1" + slaveStorage.Register(slaveId, slaveHostname) + assert.Equal(1, len(slaveStorage.SlaveIDs())) + + slaveId = "2" + slaveHostname = "hn2" + slaveStorage.Register(slaveId, slaveHostname) + assert.Equal(2, len(slaveStorage.SlaveIDs())) + + slaveIds := slaveStorage.SlaveIDs() + + slaveIdsMap := make(map[string]bool, len(slaveIds)) + for _, s := range slaveIds { + slaveIdsMap[s] = true + } + + _, ok := slaveIdsMap["1"] + assert.Equal(ok, true) + + _, ok = slaveIdsMap["2"] + assert.Equal(ok, true) +} diff --git a/contrib/mesos/pkg/scheduler/types.go b/contrib/mesos/pkg/scheduler/types.go index b529eec318a..36d4cc8fa57 100644 --- a/contrib/mesos/pkg/scheduler/types.go +++ b/contrib/mesos/pkg/scheduler/types.go @@ -57,5 +57,5 @@ var ( ) type SlaveIndex interface { - slaveFor(id string) (*Slave, bool) + slaveHostNameFor(id string) string }