Merge pull request #429 from nyaxt/threadsafe_sched

Make {random,firstfit} scheduler thread safe.
This commit is contained in:
Clayton Coleman 2014-07-13 10:04:43 -04:00
commit 8db1b55b84
4 changed files with 28 additions and 21 deletions

View File

@ -84,7 +84,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30) podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30)
go podCache.Loop() go podCache.Loop()
s := scheduler.MakeFirstFitScheduler(m.podRegistry, m.random) s := scheduler.MakeRandomFitScheduler(m.podRegistry, m.random)
m.storage = map[string]apiserver.RESTStorage{ m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache), "pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache),
"replicationControllers": registry.NewControllerRegistryStorage(m.controllerRegistry, m.podRegistry), "replicationControllers": registry.NewControllerRegistryStorage(m.controllerRegistry, m.podRegistry),

View File

@ -18,14 +18,15 @@ package scheduler
import ( import (
"math/rand" "math/rand"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
) )
// RandomScheduler chooses machines uniformly at random. // RandomScheduler chooses machines uniformly at random.
type RandomScheduler struct { type RandomScheduler struct {
// TODO: rand.Rand is *NOT* thread safe.
random *rand.Rand random *rand.Rand
randomLock sync.Mutex
} }
func MakeRandomScheduler(random *rand.Rand) Scheduler { func MakeRandomScheduler(random *rand.Rand) Scheduler {
@ -40,5 +41,8 @@ func (s *RandomScheduler) Schedule(pod api.Pod, minionLister MinionLister) (stri
if err != nil { if err != nil {
return "", err return "", err
} }
s.randomLock.Lock()
defer s.randomLock.Unlock()
return machines[s.random.Int()%len(machines)], nil return machines[s.random.Int()%len(machines)], nil
} }

View File

@ -19,26 +19,27 @@ package scheduler
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
) )
// FirstFitScheduler is a Scheduler interface implementation which uses first fit algorithm. // RandomFitScheduler is a Scheduler which schedules a Pod on a random machine which matches its requirement.
type FirstFitScheduler struct { type RandomFitScheduler struct {
podLister PodLister podLister PodLister
// TODO: *rand.Rand is *not* threadsafe
random *rand.Rand random *rand.Rand
randomLock sync.Mutex
} }
func MakeFirstFitScheduler(podLister PodLister, random *rand.Rand) Scheduler { func MakeRandomFitScheduler(podLister PodLister, random *rand.Rand) Scheduler {
return &FirstFitScheduler{ return &RandomFitScheduler{
podLister: podLister, podLister: podLister,
random: random, random: random,
} }
} }
func (s *FirstFitScheduler) containsPort(pod api.Pod, port api.Port) bool { func (s *RandomFitScheduler) containsPort(pod api.Pod, port api.Port) bool {
for _, container := range pod.DesiredState.Manifest.Containers { for _, container := range pod.DesiredState.Manifest.Containers {
for _, podPort := range container.Ports { for _, podPort := range container.Ports {
if podPort.HostPort == port.HostPort { if podPort.HostPort == port.HostPort {
@ -49,8 +50,8 @@ func (s *FirstFitScheduler) containsPort(pod api.Pod, port api.Port) bool {
return false return false
} }
// Schedule schedules a pod on the first machine which matches its requirement. // Schedule schedules a pod on a random machine which matches its requirement.
func (s *FirstFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { func (s *RandomFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
machines, err := minionLister.List() machines, err := minionLister.List()
if err != nil { if err != nil {
return "", err return "", err
@ -83,5 +84,7 @@ func (s *FirstFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (st
if len(machineOptions) == 0 { if len(machineOptions) == 0 {
return "", fmt.Errorf("failed to find fit for %#v", pod) return "", fmt.Errorf("failed to find fit for %#v", pod)
} }
s.randomLock.Lock()
defer s.randomLock.Unlock()
return machineOptions[s.random.Int()%len(machineOptions)], nil return machineOptions[s.random.Int()%len(machineOptions)], nil
} }

View File

@ -23,31 +23,31 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
) )
func TestFirstFitSchedulerNothingScheduled(t *testing.T) { func TestRandomFitSchedulerNothingScheduled(t *testing.T) {
fakeRegistry := FakePodLister{} fakeRegistry := FakePodLister{}
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
st := schedulerTester{ st := schedulerTester{
t: t, t: t,
scheduler: MakeFirstFitScheduler(&fakeRegistry, r), scheduler: MakeRandomFitScheduler(&fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"}, minionLister: FakeMinionLister{"m1", "m2", "m3"},
} }
st.expectSchedule(api.Pod{}, "m3") st.expectSchedule(api.Pod{}, "m3")
} }
func TestFirstFitSchedulerFirstScheduled(t *testing.T) { func TestRandomFitSchedulerFirstScheduled(t *testing.T) {
fakeRegistry := FakePodLister{ fakeRegistry := FakePodLister{
makePod("m1", 8080), makePod("m1", 8080),
} }
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
st := schedulerTester{ st := schedulerTester{
t: t, t: t,
scheduler: MakeFirstFitScheduler(fakeRegistry, r), scheduler: MakeRandomFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"}, minionLister: FakeMinionLister{"m1", "m2", "m3"},
} }
st.expectSchedule(makePod("", 8080), "m3") st.expectSchedule(makePod("", 8080), "m3")
} }
func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) { func TestRandomFitSchedulerFirstScheduledComplicated(t *testing.T) {
fakeRegistry := FakePodLister{ fakeRegistry := FakePodLister{
makePod("m1", 80, 8080), makePod("m1", 80, 8080),
makePod("m2", 8081, 8082, 8083), makePod("m2", 8081, 8082, 8083),
@ -56,13 +56,13 @@ func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) {
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
st := schedulerTester{ st := schedulerTester{
t: t, t: t,
scheduler: MakeFirstFitScheduler(fakeRegistry, r), scheduler: MakeRandomFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"}, minionLister: FakeMinionLister{"m1", "m2", "m3"},
} }
st.expectSchedule(makePod("", 8080, 8081), "m3") st.expectSchedule(makePod("", 8080, 8081), "m3")
} }
func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) { func TestRandomFitSchedulerFirstScheduledImpossible(t *testing.T) {
fakeRegistry := FakePodLister{ fakeRegistry := FakePodLister{
makePod("m1", 8080), makePod("m1", 8080),
makePod("m2", 8081), makePod("m2", 8081),
@ -71,7 +71,7 @@ func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) {
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
st := schedulerTester{ st := schedulerTester{
t: t, t: t,
scheduler: MakeFirstFitScheduler(fakeRegistry, r), scheduler: MakeRandomFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"}, minionLister: FakeMinionLister{"m1", "m2", "m3"},
} }
st.expectFailure(makePod("", 8080, 8081)) st.expectFailure(makePod("", 8080, 8081))