Enhance scheduler for TaintNodeByCondition.

This commit is contained in:
Klaus Ma 2017-10-01 08:26:35 +08:00
parent ac33bfd53b
commit bd15efd3e5
10 changed files with 118 additions and 68 deletions

View File

@ -298,6 +298,7 @@ func NewNodeController(
zoneStates: make(map[string]ZoneState), zoneStates: make(map[string]ZoneState),
runTaintManager: runTaintManager, runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager, useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
taintNodeByCondition: taintNodeByCondition,
} }
if useTaintBasedEvictions { if useTaintBasedEvictions {
glog.Infof("Controller is using taint based evictions.") glog.Infof("Controller is using taint based evictions.")
@ -394,6 +395,7 @@ func NewNodeController(
} }
if nc.taintNodeByCondition { if nc.taintNodeByCondition {
glog.Infof("Controller will taint node by condition.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error { AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
return nc.doNoScheduleTaintingPass(node) return nc.doNoScheduleTaintingPass(node)
@ -618,7 +620,7 @@ func (nc *Controller) monitorNodeStatus() error {
} }
return false, nil return false, nil
}); err != nil { }); err != nil {
glog.Errorf("Update status of Node %v from Controller error : %v. "+ glog.Errorf("Update status of Node '%v' from Controller error: %v. "+
"Skipping - no pods will be evicted.", node.Name, err) "Skipping - no pods will be evicted.", node.Name, err)
continue continue
} }

View File

@ -35,7 +35,7 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory" "k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"github.com/golang/glog" "github.com/golang/glog"
@ -77,6 +77,9 @@ func Run(s *options.SchedulerServer) error {
// cache only non-terminal pods // cache only non-terminal pods
podInformer := factory.NewPodInformer(kubecli, 0) podInformer := factory.NewPodInformer(kubecli, 0)
// Apply algorithms based on feature gates.
algorithmprovider.ApplyFeatureGates()
sched, err := CreateScheduler( sched, err := CreateScheduler(
s, s,
kubecli, kubecli,

View File

@ -16,7 +16,10 @@ go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["plugins_test.go"], srcs = ["plugins_test.go"],
library = ":go_default_library", library = ":go_default_library",
deps = ["//plugin/pkg/scheduler/factory:go_default_library"], deps = [
"//plugin/pkg/scheduler/factory:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
) )
filegroup( filegroup(

View File

@ -39,7 +39,6 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library",

View File

@ -60,12 +60,7 @@ func init() {
return priorities.PriorityMetadata return priorities.PriorityMetadata
}) })
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
// by specifying flag.
factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities())
// Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, defaultPredicates(),
copyAndReplace(defaultPriorities(), "LeastRequestedPriority", "MostRequestedPriority"))
// IMPORTANT NOTES for predicate developers: // IMPORTANT NOTES for predicate developers:
// We are using cached predicate result for pods belonging to the same equivalence class. // We are using cached predicate result for pods belonging to the same equivalence class.
@ -126,7 +121,7 @@ func init() {
} }
func defaultPredicates() sets.String { func defaultPredicates() sets.String {
predSet := sets.NewString( return sets.NewString(
// Fit is determined by volume zone requirements. // Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
@ -182,6 +177,12 @@ func defaultPredicates() sets.String {
// Fit is determined by node disk pressure condition. // Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate), factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
// Fit is determied by node condtions: not ready, network unavailable and out of disk.
factory.RegisterMandatoryFitPredicate("CheckNodeCondition", predicates.CheckNodeConditionPredicate),
// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints),
// Fit is determined by volume zone requirements. // Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
"NoVolumeNodeConflict", "NoVolumeNodeConflict",
@ -190,19 +191,33 @@ func defaultPredicates() sets.String {
}, },
), ),
) )
}
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
predSet := defaultPredicates()
if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) { if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
// Remove "CheckNodeCondition" predicate
factory.RemoveFitPredicate("CheckNodeCondition")
predSet.Delete("CheckNodeCondition")
// Fit is determined based on whether a pod can tolerate all of the node's taints // Fit is determined based on whether a pod can tolerate all of the node's taints
predSet.Insert(factory.RegisterMandatoryFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints)) predSet.Insert(factory.RegisterMandatoryFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints))
glog.Warningf("TaintNodesByCondition is enabled, PodToleratesNodeTaints predicate is mandatory") glog.Warningf("TaintNodesByCondition is enabled, PodToleratesNodeTaints predicate is mandatory")
} else {
// Fit is determied by node condtions: not ready, network unavailable and out of disk.
predSet.Insert(factory.RegisterMandatoryFitPredicate("CheckNodeCondition", predicates.CheckNodeConditionPredicate))
// Fit is determined based on whether a pod can tolerate all of the node's taints
predSet.Insert(factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints))
} }
return predSet registerAlgorithmProvider(predSet, defaultPriorities())
}
func registerAlgorithmProvider(predSet, priSet sets.String) {
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, "LeastRequestedPriority", "MostRequestedPriority"))
} }
func defaultPriorities() sets.String { func defaultPriorities() sets.String {

View File

@ -17,10 +17,10 @@ limitations under the License.
package defaults package defaults
import ( import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"os" "os"
"testing" "testing"
"k8s.io/apimachinery/pkg/util/sets"
) )
func TestGetMaxVols(t *testing.T) { func TestGetMaxVols(t *testing.T) {
@ -106,51 +106,22 @@ func TestDefaultPriorities(t *testing.T) {
} }
func TestDefaultPredicates(t *testing.T) { func TestDefaultPredicates(t *testing.T) {
testCases := []struct { result := sets.NewString(
actionFunc func(value string) error "NoVolumeZoneConflict",
actionParam string "MaxEBSVolumeCount",
expected sets.String "MaxGCEPDVolumeCount",
}{ "MaxAzureDiskVolumeCount",
{ "MatchInterPodAffinity",
actionFunc: utilfeature.DefaultFeatureGate.Set, "NoDiskConflict",
actionParam: "TaintNodesByCondition=true", "GeneralPredicates",
expected: sets.NewString( "CheckNodeMemoryPressure",
"NoVolumeZoneConflict", "CheckNodeDiskPressure",
"MaxEBSVolumeCount", "NoVolumeNodeConflict",
"MaxGCEPDVolumeCount", "CheckNodeCondition",
"MaxAzureDiskVolumeCount", "PodToleratesNodeTaints",
"MatchInterPodAffinity", )
"NoDiskConflict",
"GeneralPredicates", if expected := defaultPredicates(); !result.Equal(expected) {
"CheckNodeMemoryPressure", t.Errorf("expected %v got %v", expected, result)
"CheckNodeDiskPressure",
"NoVolumeNodeConflict",
"PodToleratesNodeTaints",
),
},
{
actionFunc: utilfeature.DefaultFeatureGate.Set,
actionParam: "TaintNodesByCondition=false",
expected: sets.NewString(
"NoVolumeZoneConflict",
"MaxEBSVolumeCount",
"MaxGCEPDVolumeCount",
"MaxAzureDiskVolumeCount",
"MatchInterPodAffinity",
"NoDiskConflict",
"GeneralPredicates",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"NoVolumeNodeConflict",
"CheckNodeCondition",
"PodToleratesNodeTaints",
),
},
}
for _, testCase := range testCases {
testCase.actionFunc(testCase.actionParam)
if result := defaultPredicates(); !result.Equal(testCase.expected) {
t.Errorf("expected %v got %v", testCase.expected, result)
}
} }
} }

View File

@ -17,6 +17,10 @@ limitations under the License.
package algorithmprovider package algorithmprovider
import ( import (
// Import defaults of algorithmprovider for initialization. "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
) )
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
defaults.ApplyFeatureGates()
}

View File

@ -19,6 +19,7 @@ package algorithmprovider
import ( import (
"testing" "testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory" "k8s.io/kubernetes/plugin/pkg/scheduler/factory"
) )
@ -63,3 +64,46 @@ func TestAlgorithmProviders(t *testing.T) {
} }
} }
} }
func TestApplyFeatureGates(t *testing.T) {
for _, pn := range algorithmProviderNames {
p, err := factory.GetAlgorithmProvider(pn)
if err != nil {
t.Errorf("Error retrieving '%s' provider: %v", pn, err)
break
}
if !p.FitPredicateKeys.Has("CheckNodeCondition") {
t.Errorf("Failed to find predicate: 'CheckNodeCondition'")
break
}
if !p.FitPredicateKeys.Has("PodToleratesNodeTaints") {
t.Errorf("Failed to find predicate: 'PodToleratesNodeTaints'")
break
}
}
// Apply features for algorithm providers.
utilfeature.DefaultFeatureGate.Set("TaintNodesByCondition=True")
ApplyFeatureGates()
for _, pn := range algorithmProviderNames {
p, err := factory.GetAlgorithmProvider(pn)
if err != nil {
t.Errorf("Error retrieving '%s' provider: %v", pn, err)
break
}
if !p.FitPredicateKeys.Has("PodToleratesNodeTaints") {
t.Errorf("Failed to find predicate: 'PodToleratesNodeTaints'")
break
}
if p.FitPredicateKeys.Has("CheckNodeCondition") {
t.Errorf("Unexpected predicate: 'CheckNodeCondition'")
break
}
}
}

View File

@ -207,7 +207,6 @@ func NewConfigFactory(
// they may need to call. // they may need to call.
c.scheduledPodLister = assignedPodLister{podInformer.Lister()} c.scheduledPodLister = assignedPodLister{podInformer.Lister()}
// Only nodes in the "Ready" condition with status == "True" are schedulable
nodeInformer.Informer().AddEventHandler( nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache, AddFunc: c.addNodeToCache,

View File

@ -105,6 +105,16 @@ func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string
return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate }) return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate })
} }
// RemoveFitPredicate removes a fit predicate from factory.
func RemoveFitPredicate(name string) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
delete(fitPredicateMap, name)
mandatoryFitPredicates.Delete(name)
}
// RegisterMandatoryFitPredicate registers a fit predicate with the algorithm registry, the predicate is used by // RegisterMandatoryFitPredicate registers a fit predicate with the algorithm registry, the predicate is used by
// kubelet, DaemonSet; it is always included in configuration. Returns the name with which the predicate was // kubelet, DaemonSet; it is always included in configuration. Returns the name with which the predicate was
// registered. // registered.