Extract slave hostname registry code in its own module

- remove bleeding of registry-internal objects, without any locking
- rename from SlaveStorage to Registry which fits much better to what
  it actually does
This commit is contained in:
Dr. Stefan Schimanski 2015-09-15 16:37:14 +02:00
parent c9570e34d0
commit eb5a5ffc28
8 changed files with 200 additions and 148 deletions

View File

@ -33,13 +33,12 @@ type MockScheduler struct {
mock.Mock
}
func (m *MockScheduler) slaveFor(id string) (slave *Slave, ok bool) {
func (m *MockScheduler) slaveHostNameFor(id string) (hostName string) {
args := m.Called(id)
x := args.Get(0)
if x != nil {
slave = x.(*Slave)
hostName = x.(string)
}
ok = args.Bool(1)
return
}
func (m *MockScheduler) algorithm() (f PodScheduler) {
@ -87,8 +86,8 @@ func (m *MockScheduler) launchTask(task *podtask.T) error {
// @deprecated this is a placeholder for me to test the mock package
func TestNoSlavesYet(t *testing.T) {
obj := &MockScheduler{}
obj.On("slaveFor", "foo").Return(nil, false)
obj.slaveFor("foo")
obj.On("slaveHostNameFor", "foo").Return(nil)
obj.slaveHostNameFor("foo")
obj.AssertExpectations(t)
}

View File

@ -92,9 +92,8 @@ func (k *k8smScheduler) createPodTask(ctx api.Context, pod *api.Pod) (*podtask.T
return podtask.New(ctx, "", *pod, k.internal.executor)
}
func (k *k8smScheduler) slaveFor(id string) (slave *Slave, ok bool) {
slave, ok = k.internal.slaves.getSlave(id)
return
func (k *k8smScheduler) slaveHostNameFor(id string) string {
return k.internal.slaveHostNames.HostName(id)
}
func (k *k8smScheduler) killTask(taskId string) error {
@ -326,7 +325,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
}
slaveId := details.GetSlaveId().GetValue()
if slave, ok := k.api.slaveFor(slaveId); !ok {
if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" {
// not much sense in Release()ing the offer here since its owner died
offer.Release()
k.api.offers().Invalidate(details.Id.GetValue())
@ -343,7 +342,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
offer.Release()
return "", err
}
return slave.HostName, nil
return slaveHostName, nil
}
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
@ -50,54 +51,6 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
)
type Slave struct {
HostName string
}
func newSlave(hostName string) *Slave {
return &Slave{
HostName: hostName,
}
}
type slaveStorage struct {
sync.Mutex
slaves map[string]*Slave // SlaveID => slave.
}
func newSlaveStorage() *slaveStorage {
return &slaveStorage{
slaves: make(map[string]*Slave),
}
}
// Create a mapping between a slaveID and slave if not existing.
func (self *slaveStorage) checkAndAdd(slaveId, slaveHostname string) {
self.Lock()
defer self.Unlock()
_, exists := self.slaves[slaveId]
if !exists {
self.slaves[slaveId] = newSlave(slaveHostname)
}
}
func (self *slaveStorage) getSlaveIds() []string {
self.Lock()
defer self.Unlock()
slaveIds := make([]string, 0, len(self.slaves))
for slaveID := range self.slaves {
slaveIds = append(slaveIds, slaveID)
}
return slaveIds
}
func (self *slaveStorage) getSlave(slaveId string) (*Slave, bool) {
self.Lock()
defer self.Unlock()
slave, exists := self.slaves[slaveId]
return slave, exists
}
type PluginInterface interface {
// the apiserver may have a different state for the pod than we do
// so reconcile our records, but only for this one pod
@ -138,7 +91,7 @@ type KubernetesScheduler struct {
registration chan struct{} // signal chan that closes upon first successful registration
onRegistration sync.Once
offers offers.Registry
slaves *slaveStorage
slaveHostNames *slave.Registry
// unsafe state, needs to be guarded
@ -205,7 +158,7 @@ func New(config Config) *KubernetesScheduler {
TTL: config.Schedcfg.OfferTTL.Duration,
ListenerDelay: config.Schedcfg.ListenerDelay.Duration,
}),
slaves: newSlaveStorage(),
slaveHostNames: slave.NewRegistry(),
taskRegistry: podtask.NewInMemoryRegistry(),
reconcileCooldown: config.ReconcileCooldown,
registration: make(chan struct{}),
@ -270,7 +223,7 @@ func (k *KubernetesScheduler) InstallDebugHandlers(mux *http.ServeMux) {
requestReconciliation("/debug/actions/requestImplicit", k.reconciler.RequestImplicit)
wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slaves := k.slaves.getSlaveIds()
slaves := k.slaveHostNames.SlaveIDs()
for _, slaveId := range slaves {
_, err := k.driver.SendFrameworkMessage(
k.executor.ExecutorId,
@ -368,7 +321,7 @@ func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, of
k.offers.Add(offers)
for _, offer := range offers {
slaveId := offer.GetSlaveId().GetValue()
k.slaves.checkAndAdd(slaveId, offer.GetHostname())
k.slaveHostNames.Register(slaveId, offer.GetHostname())
}
}
@ -423,7 +376,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
} // else, we don't really care about FINISHED tasks that aren't registered
return
}
if _, exists := k.slaves.getSlave(taskStatus.GetSlaveId().GetValue()); !exists {
if hostName := k.slaveHostNames.HostName(taskStatus.GetSlaveId().GetValue()); hostName != "" {
// a registered task has an update reported by a slave that we don't recognize.
// this should never happen! So we don't reconcile it.
log.Errorf("Ignore status %+v because the slave does not exist", taskStatus)
@ -920,7 +873,7 @@ func (ks *KubernetesScheduler) recoverTasks() error {
recoverSlave := func(t *podtask.T) {
slaveId := t.Spec.SlaveID
ks.slaves.checkAndAdd(slaveId, t.Offer.Host())
ks.slaveHostNames.Register(slaveId, t.Offer.Host())
}
for _, pod := range podList.Items {
if t, ok, err := podtask.RecoverFrom(pod); err != nil {

View File

@ -26,76 +26,9 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
)
// Check that same slave is only added once.
func TestSlaveStorage_checkAndAdd(t *testing.T) {
assert := assert.New(t)
slaveStorage := newSlaveStorage()
assert.Equal(0, len(slaveStorage.slaves))
slaveId := "slave1"
slaveHostname := "slave1Hostname"
slaveStorage.checkAndAdd(slaveId, slaveHostname)
assert.Equal(1, len(slaveStorage.getSlaveIds()))
slaveStorage.checkAndAdd(slaveId, slaveHostname)
assert.Equal(1, len(slaveStorage.getSlaveIds()))
}
// Check that getSlave returns notExist for nonexisting slave.
func TestSlaveStorage_getSlave(t *testing.T) {
assert := assert.New(t)
slaveStorage := newSlaveStorage()
assert.Equal(0, len(slaveStorage.slaves))
slaveId := "slave1"
slaveHostname := "slave1Hostname"
_, exists := slaveStorage.getSlave(slaveId)
assert.Equal(false, exists)
slaveStorage.checkAndAdd(slaveId, slaveHostname)
assert.Equal(1, len(slaveStorage.getSlaveIds()))
_, exists = slaveStorage.getSlave(slaveId)
assert.Equal(true, exists)
}
// Check that getSlaveIds returns array with all slaveIds.
func TestSlaveStorage_getSlaveIds(t *testing.T) {
assert := assert.New(t)
slaveStorage := newSlaveStorage()
assert.Equal(0, len(slaveStorage.slaves))
slaveId := "1"
slaveHostname := "hn1"
slaveStorage.checkAndAdd(slaveId, slaveHostname)
assert.Equal(1, len(slaveStorage.getSlaveIds()))
slaveId = "2"
slaveHostname = "hn2"
slaveStorage.checkAndAdd(slaveId, slaveHostname)
assert.Equal(2, len(slaveStorage.getSlaveIds()))
slaveIds := slaveStorage.getSlaveIds()
slaveIdsMap := make(map[string]bool, len(slaveIds))
for _, s := range slaveIds {
slaveIdsMap[s] = true
}
_, ok := slaveIdsMap["1"]
assert.Equal(ok, true)
_, ok = slaveIdsMap["2"]
assert.Equal(ok, true)
}
//get number of non-expired offers from offer registry
func getNumberOffers(os offers.Registry) int {
//walk offers and check it is stored in registry
@ -126,7 +59,7 @@ func TestResourceOffer_Add(t *testing.T) {
TTL: schedcfg.DefaultOfferTTL,
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaves: newSlaveStorage(),
slaveHostNames: slave.NewRegistry(),
}
hostname := "h1"
@ -137,7 +70,7 @@ func TestResourceOffer_Add(t *testing.T) {
assert.Equal(1, getNumberOffers(testScheduler.offers))
//check slave hostname
assert.Equal(1, len(testScheduler.slaves.getSlaveIds()))
assert.Equal(1, len(testScheduler.slaveHostNames.SlaveIDs()))
//add another offer
hostname2 := "h2"
@ -149,7 +82,7 @@ func TestResourceOffer_Add(t *testing.T) {
assert.Equal(2, getNumberOffers(testScheduler.offers))
//check slave hostnames
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
}
//test adding of ressource offer, should be added to offer registry and slavesf
@ -169,7 +102,7 @@ func TestResourceOffer_Add_Rescind(t *testing.T) {
TTL: schedcfg.DefaultOfferTTL,
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaves: newSlaveStorage(),
slaveHostNames: slave.NewRegistry(),
}
hostname := "h1"
@ -181,7 +114,7 @@ func TestResourceOffer_Add_Rescind(t *testing.T) {
assert.Equal(1, getNumberOffers(testScheduler.offers))
//check slave hostname
assert.Equal(1, len(testScheduler.slaves.getSlaveIds()))
assert.Equal(1, len(testScheduler.slaveHostNames.SlaveIDs()))
//add another offer
hostname2 := "h2"
@ -192,7 +125,7 @@ func TestResourceOffer_Add_Rescind(t *testing.T) {
assert.Equal(2, getNumberOffers(testScheduler.offers))
//check slave hostnames
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
//next whether offers can be rescinded
testScheduler.OfferRescinded(nil, offerID1)
@ -222,7 +155,7 @@ func TestSlave_Lost(t *testing.T) {
TTL: schedcfg.DefaultOfferTTL,
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaves: newSlaveStorage(),
slaveHostNames: slave.NewRegistry(),
}
hostname := "h1"
@ -241,7 +174,7 @@ func TestSlave_Lost(t *testing.T) {
//test precondition
assert.Equal(3, getNumberOffers(testScheduler.offers))
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
//remove first slave
testScheduler.SlaveLost(nil, util.NewSlaveID(hostname))
@ -249,7 +182,7 @@ func TestSlave_Lost(t *testing.T) {
//offers should be removed
assert.Equal(1, getNumberOffers(testScheduler.offers))
//slave hostnames should still be all present
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
//remove second slave
testScheduler.SlaveLost(nil, util.NewSlaveID(hostname2))
@ -257,7 +190,7 @@ func TestSlave_Lost(t *testing.T) {
//offers should be removed
assert.Equal(0, getNumberOffers(testScheduler.offers))
//slave hostnames should still be all present
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
//try to remove non existing slave
testScheduler.SlaveLost(nil, util.NewSlaveID("notExist"))
@ -279,7 +212,7 @@ func TestDisconnect(t *testing.T) {
TTL: schedcfg.DefaultOfferTTL,
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaves: newSlaveStorage(),
slaveHostNames: slave.NewRegistry(),
}
hostname := "h1"
@ -302,7 +235,7 @@ func TestDisconnect(t *testing.T) {
//all offers should be removed
assert.Equal(0, getNumberOffers(testScheduler.offers))
//slave hostnames should still be all present
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
}
//test we can handle different status updates, TODO check state transitions
@ -322,7 +255,7 @@ func TestStatus_Update(t *testing.T) {
TTL: schedcfg.DefaultOfferTTL,
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaves: newSlaveStorage(),
slaveHostNames: slave.NewRegistry(),
driver: &mockdriver,
taskRegistry: podtask.NewInMemoryRegistry(),
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package slave manages node hostnames for slave ids.
package slave

View File

@ -0,0 +1,60 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package slave
import (
"sync"
)
type Registry struct {
lock sync.Mutex
hostNames map[string]string
}
func NewRegistry() *Registry {
return &Registry{
hostNames: map[string]string{},
}
}
// Register creates a mapping between a slaveId and slave if not existing.
func (st *Registry) Register(slaveId, slaveHostname string) {
st.lock.Lock()
defer st.lock.Unlock()
_, exists := st.hostNames[slaveId]
if !exists {
st.hostNames[slaveId] = slaveHostname
}
}
// SlaveIDs returns the keys of the registry
func (st *Registry) SlaveIDs() []string {
st.lock.Lock()
defer st.lock.Unlock()
slaveIds := make([]string, 0, len(st.hostNames))
for slaveID := range st.hostNames {
slaveIds = append(slaveIds, slaveID)
}
return slaveIds
}
// HostName looks up a hostname for a given slaveId
func (st *Registry) HostName(slaveId string) string {
st.lock.Lock()
defer st.lock.Unlock()
return st.hostNames[slaveId]
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package slave
import (
"testing"
"github.com/stretchr/testify/assert"
)
// Check that same slave is only added once.
func TestSlaveStorage_Register(t *testing.T) {
assert := assert.New(t)
slaveStorage := NewRegistry()
assert.Equal(0, len(slaveStorage.hostNames))
slaveId := "slave1"
slaveHostname := "slave1Hostname"
slaveStorage.Register(slaveId, slaveHostname)
assert.Equal(1, len(slaveStorage.SlaveIDs()))
slaveStorage.Register(slaveId, slaveHostname)
assert.Equal(1, len(slaveStorage.SlaveIDs()))
}
// Check that GetHostName returns empty hostname for nonexisting slave.
func TestSlaveStorage_HostName(t *testing.T) {
assert := assert.New(t)
slaveStorage := NewRegistry()
assert.Equal(0, len(slaveStorage.hostNames))
slaveId := "slave1"
slaveHostname := "slave1Hostname"
h := slaveStorage.HostName(slaveId)
assert.Equal(h, "")
slaveStorage.Register(slaveId, slaveHostname)
assert.Equal(1, len(slaveStorage.SlaveIDs()))
h = slaveStorage.HostName(slaveId)
assert.Equal(h, slaveHostname)
}
// Check that GetSlaveIds returns array with all slaveIds.
func TestSlaveStorage_SlaveIds(t *testing.T) {
assert := assert.New(t)
slaveStorage := NewRegistry()
assert.Equal(0, len(slaveStorage.hostNames))
slaveId := "1"
slaveHostname := "hn1"
slaveStorage.Register(slaveId, slaveHostname)
assert.Equal(1, len(slaveStorage.SlaveIDs()))
slaveId = "2"
slaveHostname = "hn2"
slaveStorage.Register(slaveId, slaveHostname)
assert.Equal(2, len(slaveStorage.SlaveIDs()))
slaveIds := slaveStorage.SlaveIDs()
slaveIdsMap := make(map[string]bool, len(slaveIds))
for _, s := range slaveIds {
slaveIdsMap[s] = true
}
_, ok := slaveIdsMap["1"]
assert.Equal(ok, true)
_, ok = slaveIdsMap["2"]
assert.Equal(ok, true)
}

View File

@ -57,5 +57,5 @@ var (
)
type SlaveIndex interface {
slaveFor(id string) (*Slave, bool)
slaveHostNameFor(id string) string
}