mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
scheduler: Scheduler: replace system modeler with scheduler cache
We're using the scheduler cache to do O(1) lookup for resource request
This commit is contained in:
@@ -23,13 +23,18 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
|
||||
)
|
||||
|
||||
type fakeBinder struct {
|
||||
@@ -109,8 +114,8 @@ func TestScheduler(t *testing.T) {
|
||||
var gotAssumedPod *api.Pod
|
||||
var gotBinding *api.Binding
|
||||
c := &Config{
|
||||
Modeler: &FakeModeler{
|
||||
AssumePodFunc: func(pod *api.Pod) {
|
||||
SchedulerCache: &schedulertesting.FakeCache{
|
||||
AssumeFunc: func(pod *api.Pod) {
|
||||
gotAssumedPod = pod
|
||||
},
|
||||
},
|
||||
@@ -161,42 +166,30 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
defer eventBroadcaster.StartLogging(t.Logf).Stop()
|
||||
|
||||
// Setup modeler so we control the contents of all 3 stores: assumed,
|
||||
// scheduled and queued
|
||||
// Setup stores to test pod's workflow:
|
||||
// - queuedPodStore: pods queued before processing
|
||||
// - scheduledPodStore: pods that has a scheduling decision
|
||||
scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
scheduledPodLister := &cache.StoreToPodLister{Store: scheduledPodStore}
|
||||
|
||||
queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
||||
queuedPodLister := &cache.StoreToPodLister{Store: queuedPodStore}
|
||||
|
||||
modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister)
|
||||
|
||||
// Create a fake clock used to timestamp entries and calculate ttl. Nothing
|
||||
// will expire till we flip to something older than the ttl, at which point
|
||||
// all entries inserted with fakeTime will expire.
|
||||
ttl := 30 * time.Second
|
||||
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
|
||||
fakeClock := util.NewFakeClock(fakeTime)
|
||||
ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock}
|
||||
assumedPodsStore := cache.NewFakeExpirationStore(
|
||||
cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock)
|
||||
modeler.assumedPods = &cache.StoreToPodLister{Store: assumedPodsStore}
|
||||
|
||||
// Port is the easiest way to cause a fit predicate failure
|
||||
podPort := 8080
|
||||
firstPod := podWithPort("foo", "", podPort)
|
||||
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
cache := schedulercache.New(1*time.Second, stop)
|
||||
// Create the scheduler config
|
||||
algo := NewGenericScheduler(
|
||||
cache,
|
||||
map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts},
|
||||
[]algorithm.PriorityConfig{},
|
||||
[]algorithm.SchedulerExtender{},
|
||||
modeler.PodLister(),
|
||||
rand.New(rand.NewSource(time.Now().UnixNano())))
|
||||
|
||||
var gotBinding *api.Binding
|
||||
c := &Config{
|
||||
Modeler: modeler,
|
||||
SchedulerCache: cache,
|
||||
NodeLister: algorithm.FakeNodeLister(
|
||||
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
|
||||
),
|
||||
@@ -243,10 +236,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
|
||||
if exists {
|
||||
t.Errorf("Did not expect a queued pod, found %+v", pod)
|
||||
}
|
||||
pod, exists, _ = assumedPodsStore.GetByKey("foo")
|
||||
if !exists {
|
||||
t.Errorf("Assumed pod store should contain stale pod")
|
||||
}
|
||||
|
||||
expectBind := &api.Binding{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
@@ -260,10 +249,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
|
||||
events.Stop()
|
||||
|
||||
scheduledPodStore.Delete(pod)
|
||||
_, exists, _ = assumedPodsStore.Get(pod)
|
||||
if !exists {
|
||||
t.Errorf("Expected pod %#v in assumed pod store", pod)
|
||||
}
|
||||
|
||||
secondPod := podWithPort("bar", "", podPort)
|
||||
queuedPodStore.Add(secondPod)
|
||||
@@ -271,10 +256,26 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
|
||||
// scheduledPodStore: []
|
||||
// assumedPods: [foo:8080]
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
// waiting for the assumed pod to expire
|
||||
go func() {
|
||||
for {
|
||||
pods, err := cache.List(labels.Everything())
|
||||
if err != nil {
|
||||
t.Fatalf("cache.List failed: %v", err)
|
||||
}
|
||||
if len(pods) == 0 {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
// Second scheduling pass will fail to schedule if the store hasn't expired
|
||||
// the deleted pod. This would normally happen with a timeout.
|
||||
//expirationPolicy.NeverExpire = util.NewStringSet()
|
||||
fakeClock.Step(ttl + 1)
|
||||
|
||||
called = make(chan struct{})
|
||||
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {
|
||||
|
||||
Reference in New Issue
Block a user