kube-scheduler: move stopCh creation out of scheduler factory code

Enforces clean ownership of the channel.
This commit is contained in:
Dr. Stefan Schimanski 2018-10-11 10:49:31 +02:00
parent cb95edafe8
commit d91feb6d18
12 changed files with 79 additions and 59 deletions

View File

@ -160,6 +160,7 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
storageClassInformer,
c.Recorder,
c.ComponentConfig.AlgorithmSource,
stopCh,
scheduler.WithName(c.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(c.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(c.ComponentConfig.EnableContentionProfiling),

View File

@ -37,6 +37,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -127,7 +128,7 @@ type Config struct {
Recorder record.EventRecorder
// Close this to shut down the scheduler.
StopEverything chan struct{}
StopEverything <-chan struct{}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
@ -200,7 +201,7 @@ type configFactory struct {
storageClassLister storagelisters.StorageClassLister
// Close this to stop all reflectors
StopEverything chan struct{}
StopEverything <-chan struct{}
scheduledPodsHasSynced cache.InformerSynced
@ -253,12 +254,16 @@ type ConfigFactoryArgs struct {
DisablePreemption bool
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
StopCh <-chan struct{}
}
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := make(chan struct{})
stopEverything := args.StopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate

View File

@ -52,7 +52,9 @@ const (
func TestCreate(t *testing.T) {
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
factory.Create()
}
@ -63,7 +65,9 @@ func TestCreateFromConfig(t *testing.T) {
var policy schedulerapi.Policy
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateOne)
@ -101,7 +105,9 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
var policy schedulerapi.Policy
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateOne)
@ -140,7 +146,9 @@ func TestCreateFromEmptyConfig(t *testing.T) {
var policy schedulerapi.Policy
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
configData = []byte(`{}`)
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
@ -155,7 +163,9 @@ func TestCreateFromEmptyConfig(t *testing.T) {
// The predicate/priority from DefaultProvider will be used.
func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
RegisterFitPredicate("PredicateOne", PredicateOne)
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
@ -188,7 +198,9 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
// Empty predicate/priority sets will be used.
func TestCreateFromConfigWithEmptyPredicatesOrPriorities(t *testing.T) {
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
RegisterFitPredicate("PredicateOne", PredicateOne)
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
@ -240,7 +252,9 @@ func TestDefaultErrorFunc(t *testing.T) {
Spec: apitesting.V1DeepEqualSafePodSpec(),
}
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue)
@ -370,7 +384,9 @@ func testBind(binding *v1.Binding, t *testing.T) {
func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
client := fake.NewSimpleClientset()
// factory of "default-scheduler"
factory := newConfigFactory(client, -1)
stopCh := make(chan struct{})
factory := newConfigFactory(client, -1, stopCh)
defer close(stopCh)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: invalid hardPodAffinitySymmetricWeight, got nothing")
@ -399,7 +415,9 @@ func TestInvalidFactoryArgs(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
factory := newConfigFactory(client, test.hardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
factory := newConfigFactory(client, test.hardPodAffinitySymmetricWeight, stopCh)
defer close(stopCh)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: %s, got nothing", test.expectErr)
@ -501,7 +519,7 @@ func TestSkipPodUpdate(t *testing.T) {
}
}
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32) Configurator {
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
v1.DefaultSchedulerName,
@ -521,6 +539,7 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight
disablePodPreemption,
schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds,
stopCh,
})
}

View File

@ -59,12 +59,6 @@ type Scheduler struct {
config *factory.Config
}
// StopEverything closes the scheduler config's StopEverything channel, to shut
// down the Scheduler.
func (sched *Scheduler) StopEverything() {
close(sched.config.StopEverything)
}
// Cache returns the cache in scheduler for test to check the data in scheduler.
func (sched *Scheduler) Cache() schedulerinternalcache.Cache {
return sched.config.SchedulerCache
@ -147,6 +141,7 @@ func New(client clientset.Interface,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
options := defaultSchedulerOptions
@ -230,6 +225,7 @@ func New(client clientset.Interface,
// Additional tweaks to the config produced by the configurator.
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
config.StopEverything = stopCh
// Create the scheduler.
sched := NewFromConfig(config)
return sched, nil

View File

@ -175,6 +175,8 @@ func TestSchedulerCreation(t *testing.T) {
factory.RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
factory.RegisterAlgorithmProvider(testSource, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
stopCh := make(chan struct{})
defer close(stopCh)
_, err := New(client,
informerFactory.Core().V1().Nodes(),
factory.NewPodInformer(client, 0),
@ -188,6 +190,7 @@ func TestSchedulerCreation(t *testing.T) {
informerFactory.Storage().V1().StorageClasses(),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
stopCh,
WithBindTimeoutSeconds(defaultBindTimeout))
if err != nil {

View File

@ -349,7 +349,7 @@ func TestSchedulerExtender(t *testing.T) {
}
policy.APIVersion = "v1"
context = initTestScheduler(t, context, nil, false, &policy)
context = initTestScheduler(t, context, false, &policy)
defer cleanupTest(t, context)
DoTestPodScheduling(context.ns, t, clientSet)

View File

@ -522,7 +522,10 @@ func TestMultiScheduler(t *testing.T) {
informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0)
podInformer2 := factory.NewPodInformer(context.clientSet, 0)
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2)
stopCh := make(chan struct{})
defer close(stopCh)
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, stopCh)
schedulerConfig2, err := schedulerConfigFactory2.Create()
if err != nil {
t.Errorf("Couldn't create scheduler config: %v", err)
@ -530,12 +533,11 @@ func TestMultiScheduler(t *testing.T) {
eventBroadcaster2 := record.NewBroadcaster()
schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler})
eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet2.CoreV1().Events("")})
go podInformer2.Informer().Run(schedulerConfig2.StopEverything)
informerFactory2.Start(schedulerConfig2.StopEverything)
go podInformer2.Informer().Run(stopCh)
informerFactory2.Start(stopCh)
sched2, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig2}, nil...)
sched2.Run()
defer close(schedulerConfig2.StopEverything)
// 6. **check point-2**:
// - testPodWithAnnotationFitsFoo should be scheduled

View File

@ -85,13 +85,10 @@ func TestTaintNodeByCondition(t *testing.T) {
admission.SetExternalKubeClientSet(externalClientset)
admission.SetExternalKubeInformerFactory(externalInformers)
controllerCh := make(chan struct{})
defer close(controllerCh)
// Apply feature gates to enable TaintNodesByCondition
algorithmprovider.ApplyFeatureGates()
context = initTestScheduler(t, context, controllerCh, false, nil)
context = initTestScheduler(t, context, false, nil)
cs := context.clientSet
informers := context.informerFactory
nsName := context.ns.Name
@ -120,13 +117,13 @@ func TestTaintNodeByCondition(t *testing.T) {
t.Errorf("Failed to create node controller: %v", err)
return
}
go nc.Run(controllerCh)
go nc.Run(context.stopCh)
// Waiting for all controller sync.
externalInformers.Start(controllerCh)
externalInformers.WaitForCacheSync(controllerCh)
informers.Start(controllerCh)
informers.WaitForCacheSync(controllerCh)
externalInformers.Start(context.stopCh)
externalInformers.WaitForCacheSync(context.stopCh)
informers.Start(context.stopCh)
informers.WaitForCacheSync(context.stopCh)
// -------------------------------------------
// Test TaintNodeByCondition feature.

View File

@ -65,6 +65,7 @@ type TestContext struct {
schedulerConfigFactory factory.Configurator
schedulerConfig *factory.Config
scheduler *scheduler.Scheduler
stopCh chan struct{}
}
// createConfiguratorWithPodInformer creates a configurator for scheduler.
@ -73,6 +74,7 @@ func createConfiguratorWithPodInformer(
clientSet clientset.Interface,
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
stopCh <-chan struct{},
) factory.Configurator {
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: schedulerName,
@ -92,13 +94,16 @@ func createConfiguratorWithPodInformer(
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: 600,
StopCh: stopCh,
})
}
// initTestMasterAndScheduler initializes a test environment and creates a master with default
// configuration.
func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface) *TestContext {
var context TestContext
context := TestContext{
stopCh: make(chan struct{}),
}
// 1. Create master
h := &framework.MasterHolder{Initialized: make(chan struct{})}
@ -138,13 +143,12 @@ func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface
func initTestScheduler(
t *testing.T,
context *TestContext,
controllerCh chan struct{},
setPodInformer bool,
policy *schedulerapi.Policy,
) *TestContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, false, time.Second)
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, false, false, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -152,7 +156,6 @@ func initTestScheduler(
func initTestSchedulerWithOptions(
t *testing.T,
context *TestContext,
controllerCh chan struct{},
setPodInformer bool,
policy *schedulerapi.Policy,
disablePreemption bool,
@ -179,7 +182,7 @@ func initTestSchedulerWithOptions(
}
context.schedulerConfigFactory = createConfiguratorWithPodInformer(
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory)
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, context.stopCh)
var err error
@ -193,11 +196,6 @@ func initTestSchedulerWithOptions(
t.Fatalf("Couldn't create scheduler config: %v", err)
}
// set controllerCh if provided.
if controllerCh != nil {
context.schedulerConfig.StopEverything = controllerCh
}
// set DisablePreemption option
context.schedulerConfig.DisablePreemption = disablePreemption
@ -252,21 +250,21 @@ func initDisruptionController(context *TestContext) *disruption.DisruptionContro
// initTest initializes a test environment and creates master and scheduler with default
// configuration.
func initTest(t *testing.T, nsPrefix string) *TestContext {
return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), nil, true, nil)
return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), true, nil)
}
// initTestDisablePreemption initializes a test environment and creates master and scheduler with default
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, false, time.Second)
t, initTestMaster(t, nsPrefix, nil), true, nil, true, false, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called
// at the end of a test.
func cleanupTest(t *testing.T, context *TestContext) {
// Kill the scheduler.
close(context.schedulerConfig.StopEverything)
close(context.stopCh)
// Cleanup nodes.
context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
framework.DeleteTestingNamespace(context.ns, context.httpServer, t)

View File

@ -901,9 +901,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
// Set feature gates
utilfeature.DefaultFeatureGate.SetFromMap(features)
controllerCh := make(chan struct{})
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, disableEquivalenceCache, resyncPeriod)
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, false, disableEquivalenceCache, resyncPeriod)
clientset := context.clientSet
ns := context.ns.Name
@ -912,10 +910,10 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
if err != nil {
t.Fatalf("Failed to create PV controller: %v", err)
}
go ctrl.Run(controllerCh)
go ctrl.Run(context.stopCh)
// Start informer factory after all controllers are configured and running.
informerFactory.Start(controllerCh)
informerFactory.WaitForCacheSync(controllerCh)
informerFactory.Start(context.stopCh)
informerFactory.WaitForCacheSync(context.stopCh)
// Create shared objects
// Create nodes
@ -936,7 +934,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
return &testConfig{
client: clientset,
ns: ns,
stop: controllerCh,
stop: context.stopCh,
teardown: func() {
deleteTestObjects(clientset, ns, nil)
cleanupTest(t, context)

View File

@ -66,7 +66,8 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
evtWatch := evtBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
Interface: clientSet.CoreV1().Events("")})
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory)
stopCh := make(chan struct{})
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory, stopCh)
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *factory.Config) {
conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
@ -75,16 +76,13 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
glog.Fatalf("Error creating scheduler: %v", err)
}
stop := make(chan struct{})
informerFactory.Start(stop)
informerFactory.Start(stopCh)
sched.Run()
shutdownFunc := func() {
glog.Infof("destroying scheduler")
evtWatch.Stop()
sched.StopEverything()
close(stop)
close(stopCh)
glog.Infof("destroyed scheduler")
}
return schedulerConfigurator, shutdownFunc
@ -94,6 +92,7 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
func createSchedulerConfigurator(
clientSet clientset.Interface,
informerFactory informers.SharedInformerFactory,
stopCh <-chan struct{},
) factory.Configurator {
// Enable EnableEquivalenceClassCache for all integration tests.
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")
@ -115,5 +114,6 @@ func createSchedulerConfigurator(
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
StopCh: stopCh,
})
}