scheduler: make ApplyFeatureGates() stateless

This commit is contained in:
Wei Huang 2019-07-30 11:41:57 -07:00
parent 1871f75b32
commit eb3ed24853
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
6 changed files with 80 additions and 15 deletions

View File

@ -57,7 +57,11 @@ func defaultPredicates() sets.String {
} }
// ApplyFeatureGates applies algorithm by feature gates. // ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() { // The returned function is used to restore the state of registered predicates/priorities
// when this function is called, and should be called in tests which may modify the value
// of a feature gate temporarily.
func ApplyFeatureGates() (restore func()) {
snapshot := factory.Copy()
if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) { if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
// Remove "CheckNodeCondition", "CheckNodeMemoryPressure", "CheckNodePIDPressure" // Remove "CheckNodeCondition", "CheckNodeMemoryPressure", "CheckNodePIDPressure"
// and "CheckNodeDiskPressure" predicates // and "CheckNodeDiskPressure" predicates
@ -105,6 +109,11 @@ func ApplyFeatureGates() {
// Register the priority function to specific provider too. // Register the priority function to specific provider too.
factory.InsertPriorityKeyToAlgorithmProviderMap(factory.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, priorities.ResourceLimitsPriorityMap, nil, 1)) factory.InsertPriorityKeyToAlgorithmProviderMap(factory.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, priorities.ResourceLimitsPriorityMap, nil, 1))
} }
restore = func() {
factory.Apply(snapshot)
}
return
} }
func registerAlgorithmProvider(predSet, priSet sets.String) { func registerAlgorithmProvider(predSet, priSet sets.String) {

View File

@ -21,6 +21,6 @@ import (
) )
// ApplyFeatureGates applies algorithm by feature gates. // ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() { func ApplyFeatureGates() func() {
defaults.ApplyFeatureGates() return defaults.ApplyFeatureGates()
} }

View File

@ -94,7 +94,7 @@ func TestApplyFeatureGates(t *testing.T) {
// Apply features for algorithm providers. // Apply features for algorithm providers.
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TaintNodesByCondition, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TaintNodesByCondition, true)()
ApplyFeatureGates() defer ApplyFeatureGates()()
for _, pn := range algorithmProviderNames { for _, pn := range algorithmProviderNames {
t.Run(pn, func(t *testing.T) { t.Run(pn, func(t *testing.T) {

View File

@ -101,6 +101,60 @@ type AlgorithmProviderConfig struct {
PriorityFunctionKeys sets.String PriorityFunctionKeys sets.String
} }
// Snapshot is used to store current state of registered predicates and priorities.
type Snapshot struct {
fitPredicateMap map[string]FitPredicateFactory
mandatoryFitPredicates sets.String
priorityFunctionMap map[string]PriorityConfigFactory
algorithmProviderMap map[string]AlgorithmProviderConfig
}
// Copy returns a snapshot of current registered predicates and priorities.
func Copy() *Snapshot {
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
copy := Snapshot{
fitPredicateMap: make(map[string]FitPredicateFactory),
mandatoryFitPredicates: sets.NewString(),
priorityFunctionMap: make(map[string]PriorityConfigFactory),
algorithmProviderMap: make(map[string]AlgorithmProviderConfig),
}
for k, v := range fitPredicateMap {
copy.fitPredicateMap[k] = v
}
for k := range mandatoryFitPredicates {
copy.mandatoryFitPredicates[k] = struct{}{}
}
for k, v := range priorityFunctionMap {
copy.priorityFunctionMap[k] = v
}
for provider, config := range algorithmProviderMap {
copyPredKeys, copyPrioKeys := sets.NewString(), sets.NewString()
for k := range config.FitPredicateKeys {
copyPredKeys[k] = struct{}{}
}
for k := range config.PriorityFunctionKeys {
copyPrioKeys[k] = struct{}{}
}
copy.algorithmProviderMap[provider] = AlgorithmProviderConfig{
FitPredicateKeys: copyPredKeys,
PriorityFunctionKeys: copyPrioKeys,
}
}
return &copy
}
// Apply sets state of predicates and priorities to `s`.
func Apply(s *Snapshot) {
schedulerFactoryMutex.Lock()
fitPredicateMap = s.fitPredicateMap
mandatoryFitPredicates = s.mandatoryFitPredicates
priorityFunctionMap = s.priorityFunctionMap
algorithmProviderMap = s.algorithmProviderMap
schedulerFactoryMutex.Unlock()
}
// RegisterFitPredicate registers a fit predicate with the algorithm // RegisterFitPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered. // registry. Returns the name with which the predicate was registered.
func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string { func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {

View File

@ -89,14 +89,15 @@ func setupScheduler(
cs clientset.Interface, cs clientset.Interface,
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
stopCh chan struct{}, stopCh chan struct{},
) { ) (restoreFeatureGates func()) {
restoreFeatureGates = func() {}
// If ScheduleDaemonSetPods is disabled, do not start scheduler. // If ScheduleDaemonSetPods is disabled, do not start scheduler.
if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
return return
} }
// Enable Features. // Enable Features.
algorithmprovider.ApplyFeatureGates() restoreFeatureGates = algorithmprovider.ApplyFeatureGates()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: cs.EventsV1beta1().Events(""), Interface: cs.EventsV1beta1().Events(""),
@ -136,6 +137,7 @@ func setupScheduler(
eventBroadcaster.StartRecordingToSink(stopCh) eventBroadcaster.StartRecordingToSink(stopCh)
go sched.Run() go sched.Run()
return
} }
func testLabels() map[string]string { func testLabels() map[string]string {
@ -513,7 +515,7 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
defer close(stopCh) defer close(stopCh)
// Start Scheduler // Start Scheduler
setupScheduler(t, clientset, informers, stopCh) defer setupScheduler(t, clientset, informers, stopCh)()
informers.Start(stopCh) informers.Start(stopCh)
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
@ -557,7 +559,7 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
// Start Scheduler // Start Scheduler
setupScheduler(t, clientset, informers, stopCh) defer setupScheduler(t, clientset, informers, stopCh)()
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
@ -595,7 +597,7 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
// Start Scheduler // Start Scheduler
setupScheduler(t, clientset, informers, stopCh) defer setupScheduler(t, clientset, informers, stopCh)()
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
@ -665,7 +667,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
// Start Scheduler // Start Scheduler
setupScheduler(t, clientset, informers, stopCh) defer setupScheduler(t, clientset, informers, stopCh)()
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
@ -753,7 +755,7 @@ func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T)
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
// Start Scheduler // Start Scheduler
setupScheduler(t, clientset, informers, stopCh) defer setupScheduler(t, clientset, informers, stopCh)()
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m") ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m")
@ -816,7 +818,7 @@ func TestLaunchWithHashCollision(t *testing.T) {
informers.Start(stopCh) informers.Start(stopCh)
go dc.Run(1, stopCh) go dc.Run(1, stopCh)
setupScheduler(t, clientset, informers, stopCh) defer setupScheduler(t, clientset, informers, stopCh)()
// Create single node // Create single node
_, err := nodeClient.Create(newNode("single-node", nil)) _, err := nodeClient.Create(newNode("single-node", nil))
@ -924,7 +926,7 @@ func TestTaintedNode(t *testing.T) {
defer close(stopCh) defer close(stopCh)
// Start Scheduler // Start Scheduler
setupScheduler(t, clientset, informers, stopCh) defer setupScheduler(t, clientset, informers, stopCh)()
informers.Start(stopCh) informers.Start(stopCh)
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
@ -996,7 +998,7 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
// Start Scheduler // Start Scheduler
setupScheduler(t, clientset, informers, stopCh) defer setupScheduler(t, clientset, informers, stopCh)()
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy

View File

@ -82,7 +82,7 @@ func TestTaintNodeByCondition(t *testing.T) {
admission.SetExternalKubeInformerFactory(externalInformers) admission.SetExternalKubeInformerFactory(externalInformers)
// Apply feature gates to enable TaintNodesByCondition // Apply feature gates to enable TaintNodesByCondition
algorithmprovider.ApplyFeatureGates() defer algorithmprovider.ApplyFeatureGates()()
context = initTestScheduler(t, context, false, nil) context = initTestScheduler(t, context, false, nil)
cs := context.clientSet cs := context.clientSet