mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Merge pull request #62243 from resouer/fix-62068
Automatic merge from submit-queue (batch tested with PRs 59592, 62308, 62523, 62635, 62243). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Separate pod priority from preemption **What this PR does / why we need it**: Users request to split priority and preemption feature gate so they can use priority separately. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #62068 **Special notes for your reviewer**: ~~I kept use `ENABLE_POD_PRIORITY` as ENV name for gce cluster scripts for backward compatibility reason. Please let me know if other approach is preffered.~~ ~~This is a potential **break change** as existing clusters will be affected, we may need to include this in 1.11 maybe?~~ TODO: update this doc https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/ [Update] Usage: in config file for scheduler: ```yaml apiVersion: componentconfig/v1alpha1 kind: KubeSchedulerConfiguration ... disablePreemption: true ``` **Release note**: ```release-note Split PodPriority and PodPreemption feature gate ```
This commit is contained in:
commit
1e39d68ecb
@ -419,6 +419,8 @@ type SchedulerServer struct {
|
|||||||
HealthzServer *http.Server
|
HealthzServer *http.Server
|
||||||
// MetricsServer is optional.
|
// MetricsServer is optional.
|
||||||
MetricsServer *http.Server
|
MetricsServer *http.Server
|
||||||
|
// Disable pod preemption or not.
|
||||||
|
DisablePreemption bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSchedulerServer creates a runnable SchedulerServer from configuration.
|
// NewSchedulerServer creates a runnable SchedulerServer from configuration.
|
||||||
@ -483,6 +485,7 @@ func NewSchedulerServer(config *componentconfig.KubeSchedulerConfiguration, mast
|
|||||||
LeaderElection: leaderElectionConfig,
|
LeaderElection: leaderElectionConfig,
|
||||||
HealthzServer: healthzServer,
|
HealthzServer: healthzServer,
|
||||||
MetricsServer: metricsServer,
|
MetricsServer: metricsServer,
|
||||||
|
DisablePreemption: config.DisablePreemption,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -697,6 +700,7 @@ func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
|
|||||||
storageClassInformer,
|
storageClassInformer,
|
||||||
s.HardPodAffinitySymmetricWeight,
|
s.HardPodAffinitySymmetricWeight,
|
||||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||||
|
s.DisablePreemption,
|
||||||
)
|
)
|
||||||
|
|
||||||
source := s.AlgorithmSource
|
source := s.AlgorithmSource
|
||||||
@ -754,5 +758,7 @@ func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
|
|||||||
}
|
}
|
||||||
// Additional tweaks to the config produced by the configurator.
|
// Additional tweaks to the config produced by the configurator.
|
||||||
config.Recorder = s.Recorder
|
config.Recorder = s.Recorder
|
||||||
|
|
||||||
|
config.DisablePreemption = s.DisablePreemption
|
||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
||||||
|
@ -111,6 +111,9 @@ type KubeSchedulerConfiguration struct {
|
|||||||
// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
|
// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
|
||||||
// DEPRECATED: This is no longer used.
|
// DEPRECATED: This is no longer used.
|
||||||
FailureDomains string
|
FailureDomains string
|
||||||
|
|
||||||
|
// DisablePreemption disables the pod preemption feature.
|
||||||
|
DisablePreemption bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// KubeSchedulerLeaderElectionConfiguration expands LeaderElectionConfiguration
|
// KubeSchedulerLeaderElectionConfiguration expands LeaderElectionConfiguration
|
||||||
|
@ -106,6 +106,9 @@ type KubeSchedulerConfiguration struct {
|
|||||||
|
|
||||||
// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
|
// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
|
||||||
FailureDomains string `json:"failureDomains"`
|
FailureDomains string `json:"failureDomains"`
|
||||||
|
|
||||||
|
// DisablePreemption disables the pod preemption feature.
|
||||||
|
DisablePreemption bool `json:"disablePreemption"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// LeaderElectionConfiguration defines the configuration of leader election
|
// LeaderElectionConfiguration defines the configuration of leader election
|
||||||
|
@ -675,6 +675,7 @@ func autoConvert_v1alpha1_KubeSchedulerConfiguration_To_componentconfig_KubeSche
|
|||||||
out.EnableProfiling = in.EnableProfiling
|
out.EnableProfiling = in.EnableProfiling
|
||||||
out.EnableContentionProfiling = in.EnableContentionProfiling
|
out.EnableContentionProfiling = in.EnableContentionProfiling
|
||||||
out.FailureDomains = in.FailureDomains
|
out.FailureDomains = in.FailureDomains
|
||||||
|
out.DisablePreemption = in.DisablePreemption
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -700,6 +701,7 @@ func autoConvert_componentconfig_KubeSchedulerConfiguration_To_v1alpha1_KubeSche
|
|||||||
out.EnableProfiling = in.EnableProfiling
|
out.EnableProfiling = in.EnableProfiling
|
||||||
out.EnableContentionProfiling = in.EnableContentionProfiling
|
out.EnableContentionProfiling = in.EnableContentionProfiling
|
||||||
out.FailureDomains = in.FailureDomains
|
out.FailureDomains = in.FailureDomains
|
||||||
|
out.DisablePreemption = in.DisablePreemption
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -579,6 +579,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
|||||||
informerFactory.Storage().V1().StorageClasses(),
|
informerFactory.Storage().V1().StorageClasses(),
|
||||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||||
enableEquivalenceCache,
|
enableEquivalenceCache,
|
||||||
|
false,
|
||||||
).CreateFromConfig(policy); err != nil {
|
).CreateFromConfig(policy); err != nil {
|
||||||
t.Errorf("%s: Error constructing: %v", v, err)
|
t.Errorf("%s: Error constructing: %v", v, err)
|
||||||
continue
|
continue
|
||||||
|
@ -506,7 +506,18 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||||||
}
|
}
|
||||||
queue := NewSchedulingQueue()
|
queue := NewSchedulingQueue()
|
||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyPriorityMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, false)
|
cache,
|
||||||
|
nil,
|
||||||
|
queue,
|
||||||
|
test.predicates,
|
||||||
|
algorithm.EmptyPredicateMetadataProducer,
|
||||||
|
test.prioritizers,
|
||||||
|
algorithm.EmptyPriorityMetadataProducer,
|
||||||
|
extenders,
|
||||||
|
nil,
|
||||||
|
schedulertesting.FakePersistentVolumeClaimLister{},
|
||||||
|
false,
|
||||||
|
false)
|
||||||
podIgnored := &v1.Pod{}
|
podIgnored := &v1.Pod{}
|
||||||
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
|
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
|
||||||
if test.expectsErr {
|
if test.expectsErr {
|
||||||
|
@ -98,6 +98,7 @@ type genericScheduler struct {
|
|||||||
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
|
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
|
||||||
volumeBinder *volumebinder.VolumeBinder
|
volumeBinder *volumebinder.VolumeBinder
|
||||||
pvcLister corelisters.PersistentVolumeClaimLister
|
pvcLister corelisters.PersistentVolumeClaimLister
|
||||||
|
disablePreemption bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
||||||
@ -1107,7 +1108,9 @@ func NewGenericScheduler(
|
|||||||
extenders []algorithm.SchedulerExtender,
|
extenders []algorithm.SchedulerExtender,
|
||||||
volumeBinder *volumebinder.VolumeBinder,
|
volumeBinder *volumebinder.VolumeBinder,
|
||||||
pvcLister corelisters.PersistentVolumeClaimLister,
|
pvcLister corelisters.PersistentVolumeClaimLister,
|
||||||
alwaysCheckAllPredicates bool) algorithm.ScheduleAlgorithm {
|
alwaysCheckAllPredicates bool,
|
||||||
|
disablePreemption bool,
|
||||||
|
) algorithm.ScheduleAlgorithm {
|
||||||
return &genericScheduler{
|
return &genericScheduler{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
equivalenceCache: eCache,
|
equivalenceCache: eCache,
|
||||||
@ -1121,5 +1124,6 @@ func NewGenericScheduler(
|
|||||||
volumeBinder: volumeBinder,
|
volumeBinder: volumeBinder,
|
||||||
pvcLister: pvcLister,
|
pvcLister: pvcLister,
|
||||||
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
|
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
|
||||||
|
disablePreemption: disablePreemption,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -409,7 +409,18 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
pvcLister := schedulertesting.FakePersistentVolumeClaimLister(pvcs)
|
pvcLister := schedulertesting.FakePersistentVolumeClaimLister(pvcs)
|
||||||
|
|
||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyPriorityMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister, test.alwaysCheckAllPredicates)
|
cache,
|
||||||
|
nil,
|
||||||
|
NewSchedulingQueue(),
|
||||||
|
test.predicates,
|
||||||
|
algorithm.EmptyPredicateMetadataProducer,
|
||||||
|
test.prioritizers,
|
||||||
|
algorithm.EmptyPriorityMetadataProducer,
|
||||||
|
[]algorithm.SchedulerExtender{},
|
||||||
|
nil,
|
||||||
|
pvcLister,
|
||||||
|
test.alwaysCheckAllPredicates,
|
||||||
|
false)
|
||||||
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
|
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
|
||||||
|
|
||||||
if !reflect.DeepEqual(err, test.wErr) {
|
if !reflect.DeepEqual(err, test.wErr) {
|
||||||
@ -1323,7 +1334,18 @@ func TestPreempt(t *testing.T) {
|
|||||||
extenders = append(extenders, extender)
|
extenders = append(extenders, extender)
|
||||||
}
|
}
|
||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyPriorityMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, false)
|
cache,
|
||||||
|
nil,
|
||||||
|
NewSchedulingQueue(),
|
||||||
|
map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
|
||||||
|
algorithm.EmptyPredicateMetadataProducer,
|
||||||
|
[]algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}},
|
||||||
|
algorithm.EmptyPriorityMetadataProducer,
|
||||||
|
extenders,
|
||||||
|
nil,
|
||||||
|
schedulertesting.FakePersistentVolumeClaimLister{},
|
||||||
|
false,
|
||||||
|
false)
|
||||||
// Call Preempt and check the expected results.
|
// Call Preempt and check the expected results.
|
||||||
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -133,6 +133,9 @@ type configFactory struct {
|
|||||||
|
|
||||||
// always check all predicates even if the middle of one predicate fails.
|
// always check all predicates even if the middle of one predicate fails.
|
||||||
alwaysCheckAllPredicates bool
|
alwaysCheckAllPredicates bool
|
||||||
|
|
||||||
|
// Disable pod preemption or not.
|
||||||
|
disablePreemption bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
|
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
|
||||||
@ -152,6 +155,7 @@ func NewConfigFactory(
|
|||||||
storageClassInformer storageinformers.StorageClassInformer,
|
storageClassInformer storageinformers.StorageClassInformer,
|
||||||
hardPodAffinitySymmetricWeight int32,
|
hardPodAffinitySymmetricWeight int32,
|
||||||
enableEquivalenceClassCache bool,
|
enableEquivalenceClassCache bool,
|
||||||
|
disablePreemption bool,
|
||||||
) scheduler.Configurator {
|
) scheduler.Configurator {
|
||||||
stopEverything := make(chan struct{})
|
stopEverything := make(chan struct{})
|
||||||
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
|
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
|
||||||
@ -179,6 +183,7 @@ func NewConfigFactory(
|
|||||||
schedulerName: schedulerName,
|
schedulerName: schedulerName,
|
||||||
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
|
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
|
||||||
enableEquivalenceClassCache: enableEquivalenceClassCache,
|
enableEquivalenceClassCache: enableEquivalenceClassCache,
|
||||||
|
disablePreemption: disablePreemption,
|
||||||
}
|
}
|
||||||
|
|
||||||
c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
||||||
@ -1064,7 +1069,20 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||||||
glog.Info("Created equivalence class cache")
|
glog.Info("Created equivalence class cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
algo := core.NewGenericScheduler(c.schedulerCache, c.equivalencePodCache, c.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, c.volumeBinder, c.pVCLister, c.alwaysCheckAllPredicates)
|
algo := core.NewGenericScheduler(
|
||||||
|
c.schedulerCache,
|
||||||
|
c.equivalencePodCache,
|
||||||
|
c.podQueue,
|
||||||
|
predicateFuncs,
|
||||||
|
predicateMetaProducer,
|
||||||
|
priorityConfigs,
|
||||||
|
priorityMetaProducer,
|
||||||
|
extenders,
|
||||||
|
c.volumeBinder,
|
||||||
|
c.pVCLister,
|
||||||
|
c.alwaysCheckAllPredicates,
|
||||||
|
c.disablePreemption,
|
||||||
|
)
|
||||||
|
|
||||||
podBackoff := util.CreateDefaultPodBackoff()
|
podBackoff := util.CreateDefaultPodBackoff()
|
||||||
return &scheduler.Config{
|
return &scheduler.Config{
|
||||||
|
@ -46,7 +46,10 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const enableEquivalenceCache = true
|
const (
|
||||||
|
enableEquivalenceCache = true
|
||||||
|
disablePodPreemption = false
|
||||||
|
)
|
||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func TestCreate(t *testing.T) {
|
||||||
handler := utiltesting.FakeHandler{
|
handler := utiltesting.FakeHandler{
|
||||||
@ -533,6 +536,7 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh
|
|||||||
informerFactory.Storage().V1().StorageClasses(),
|
informerFactory.Storage().V1().StorageClasses(),
|
||||||
hardPodAffinitySymmetricWeight,
|
hardPodAffinitySymmetricWeight,
|
||||||
enableEquivalenceCache,
|
enableEquivalenceCache,
|
||||||
|
disablePodPreemption,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,6 +137,9 @@ type Config struct {
|
|||||||
|
|
||||||
// VolumeBinder handles PVC/PV binding for the pod.
|
// VolumeBinder handles PVC/PV binding for the pod.
|
||||||
VolumeBinder *volumebinder.VolumeBinder
|
VolumeBinder *volumebinder.VolumeBinder
|
||||||
|
|
||||||
|
// Disable pod preemption or not.
|
||||||
|
DisablePreemption bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.
|
// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.
|
||||||
@ -207,8 +210,9 @@ func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
|
|||||||
// If it succeeds, it adds the name of the node where preemption has happened to the pod annotations.
|
// If it succeeds, it adds the name of the node where preemption has happened to the pod annotations.
|
||||||
// It returns the node name and an error if any.
|
// It returns the node name and an error if any.
|
||||||
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
|
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
|
||||||
if !util.PodPriorityEnabled() {
|
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
|
||||||
glog.V(3).Infof("Pod priority feature is not enabled. No preemption is performed.")
|
glog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
|
||||||
|
" No preemption is performed.")
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
|
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
|
||||||
|
@ -548,6 +548,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
|
|||||||
[]algorithm.SchedulerExtender{},
|
[]algorithm.SchedulerExtender{},
|
||||||
nil,
|
nil,
|
||||||
schedulertesting.FakePersistentVolumeClaimLister{},
|
schedulertesting.FakePersistentVolumeClaimLister{},
|
||||||
|
false,
|
||||||
false)
|
false)
|
||||||
bindingChan := make(chan *v1.Binding, 1)
|
bindingChan := make(chan *v1.Binding, 1)
|
||||||
errChan := make(chan error, 1)
|
errChan := make(chan error, 1)
|
||||||
@ -596,6 +597,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
|
|||||||
[]algorithm.SchedulerExtender{},
|
[]algorithm.SchedulerExtender{},
|
||||||
nil,
|
nil,
|
||||||
schedulertesting.FakePersistentVolumeClaimLister{},
|
schedulertesting.FakePersistentVolumeClaimLister{},
|
||||||
|
false,
|
||||||
false)
|
false)
|
||||||
bindingChan := make(chan *v1.Binding, 2)
|
bindingChan := make(chan *v1.Binding, 2)
|
||||||
configurator := &FakeConfigurator{
|
configurator := &FakeConfigurator{
|
||||||
|
@ -41,8 +41,8 @@ import (
|
|||||||
|
|
||||||
var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300)
|
var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300)
|
||||||
|
|
||||||
func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
|
func waitForNominatedNodeNameWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
|
||||||
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
if err := wait.Poll(100*time.Millisecond, timeout, func() (bool, error) {
|
||||||
pod, err := cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
|
pod, err := cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
@ -57,6 +57,10 @@ func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
|
||||||
|
return waitForNominatedNodeNameWithTimeout(cs, pod, wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
// TestPreemption tests a few preemption scenarios.
|
// TestPreemption tests a few preemption scenarios.
|
||||||
func TestPreemption(t *testing.T) {
|
func TestPreemption(t *testing.T) {
|
||||||
// Enable PodPriority feature gate.
|
// Enable PodPriority feature gate.
|
||||||
@ -285,6 +289,88 @@ func TestPreemption(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestDisablePreemption tests disable pod preemption of scheduler works as expected.
|
||||||
|
func TestDisablePreemption(t *testing.T) {
|
||||||
|
// Enable PodPriority feature gate.
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.PodPriority))
|
||||||
|
// Initialize scheduler, and disable preemption.
|
||||||
|
context := initTestDisablePreemption(t, "disable-preemption")
|
||||||
|
defer cleanupTest(t, context)
|
||||||
|
cs := context.clientSet
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
existingPods []*v1.Pod
|
||||||
|
pod *v1.Pod
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "pod preemption will not happen",
|
||||||
|
existingPods: []*v1.Pod{
|
||||||
|
initPausePod(context.clientSet, &pausePodConfig{
|
||||||
|
Name: "victim-pod",
|
||||||
|
Namespace: context.ns.Name,
|
||||||
|
Priority: &lowPriority,
|
||||||
|
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
|
||||||
|
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
pod: initPausePod(cs, &pausePodConfig{
|
||||||
|
Name: "preemptor-pod",
|
||||||
|
Namespace: context.ns.Name,
|
||||||
|
Priority: &highPriority,
|
||||||
|
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
|
||||||
|
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a node with some resources and a label.
|
||||||
|
nodeRes := &v1.ResourceList{
|
||||||
|
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
|
||||||
|
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
|
||||||
|
v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI),
|
||||||
|
}
|
||||||
|
_, err := createNode(context.clientSet, "node1", nodeRes)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error creating nodes: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
pods := make([]*v1.Pod, len(test.existingPods))
|
||||||
|
// Create and run existingPods.
|
||||||
|
for i, p := range test.existingPods {
|
||||||
|
pods[i], err = runPausePod(cs, p)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Create the "pod".
|
||||||
|
preemptor, err := createPausePod(cs, test.pod)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error while creating high priority pod: %v", err)
|
||||||
|
}
|
||||||
|
// Ensure preemptor should keep unschedulable.
|
||||||
|
if err := waitForPodUnschedulable(cs, preemptor); err != nil {
|
||||||
|
t.Errorf("Test [%v]: Preemptor %v should not become scheduled",
|
||||||
|
test.description, preemptor.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure preemptor should not be nominated.
|
||||||
|
if err := waitForNominatedNodeNameWithTimeout(cs, preemptor, 5*time.Second); err == nil {
|
||||||
|
t.Errorf("Test [%v]: Preemptor %v should not be nominated",
|
||||||
|
test.description, preemptor.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
pods = append(pods, preemptor)
|
||||||
|
cleanupPods(cs, t, pods)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func mkPriorityPodWithGrace(tc *TestContext, name string, priority int32, grace int64) *v1.Pod {
|
func mkPriorityPodWithGrace(tc *TestContext, name string, priority int32, grace int64) *v1.Pod {
|
||||||
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
|
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
|
||||||
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
|
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
|
||||||
|
@ -63,7 +63,7 @@ type TestContext struct {
|
|||||||
scheduler *scheduler.Scheduler
|
scheduler *scheduler.Scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
// createConfiguratorWithPodInformer create a configurator for scheduler with given informer factory, custom name and pod informer.
|
// createConfiguratorWithPodInformer creates a configurator for scheduler.
|
||||||
func createConfiguratorWithPodInformer(
|
func createConfiguratorWithPodInformer(
|
||||||
schedulerName string,
|
schedulerName string,
|
||||||
clientSet clientset.Interface,
|
clientSet clientset.Interface,
|
||||||
@ -85,6 +85,7 @@ func createConfiguratorWithPodInformer(
|
|||||||
informerFactory.Storage().V1().StorageClasses(),
|
informerFactory.Storage().V1().StorageClasses(),
|
||||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,7 +116,14 @@ func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 2. Create kubeclient
|
// 2. Create kubeclient
|
||||||
context.clientSet = clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
context.clientSet = clientset.NewForConfigOrDie(
|
||||||
|
&restclient.Config{
|
||||||
|
QPS: -1, Host: s.URL,
|
||||||
|
ContentConfig: restclient.ContentConfig{
|
||||||
|
GroupVersion: testapi.Groups[v1.GroupName].GroupVersion(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
return &context
|
return &context
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,6 +135,21 @@ func initTestScheduler(
|
|||||||
controllerCh chan struct{},
|
controllerCh chan struct{},
|
||||||
setPodInformer bool,
|
setPodInformer bool,
|
||||||
policy *schedulerapi.Policy,
|
policy *schedulerapi.Policy,
|
||||||
|
) *TestContext {
|
||||||
|
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
|
||||||
|
// feature gate is enabled at the same time.
|
||||||
|
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
|
||||||
|
// configuration and other options.
|
||||||
|
func initTestSchedulerWithOptions(
|
||||||
|
t *testing.T,
|
||||||
|
context *TestContext,
|
||||||
|
controllerCh chan struct{},
|
||||||
|
setPodInformer bool,
|
||||||
|
policy *schedulerapi.Policy,
|
||||||
|
disablePreemption bool,
|
||||||
) *TestContext {
|
) *TestContext {
|
||||||
// Enable EnableEquivalenceClassCache for all integration tests.
|
// Enable EnableEquivalenceClassCache for all integration tests.
|
||||||
defer utilfeaturetesting.SetFeatureGateDuringTest(
|
defer utilfeaturetesting.SetFeatureGateDuringTest(
|
||||||
@ -166,19 +189,29 @@ func initTestScheduler(
|
|||||||
context.schedulerConfig.StopEverything = controllerCh
|
context.schedulerConfig.StopEverything = controllerCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set DisablePreemption option
|
||||||
|
context.schedulerConfig.DisablePreemption = disablePreemption
|
||||||
|
|
||||||
// set setPodInformer if provided.
|
// set setPodInformer if provided.
|
||||||
if setPodInformer {
|
if setPodInformer {
|
||||||
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
|
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
|
||||||
}
|
}
|
||||||
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName})
|
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
|
||||||
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: context.clientSet.CoreV1().Events("")})
|
legacyscheme.Scheme,
|
||||||
|
v1.EventSource{Component: v1.DefaultSchedulerName},
|
||||||
|
)
|
||||||
|
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
|
||||||
|
Interface: context.clientSet.CoreV1().Events(""),
|
||||||
|
})
|
||||||
|
|
||||||
context.informerFactory.Start(context.schedulerConfig.StopEverything)
|
context.informerFactory.Start(context.schedulerConfig.StopEverything)
|
||||||
context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything)
|
context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything)
|
||||||
|
|
||||||
context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: context.schedulerConfig}, nil...)
|
context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{
|
||||||
|
Config: context.schedulerConfig},
|
||||||
|
nil...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Couldn't create scheduler: %v", err)
|
t.Fatalf("Couldn't create scheduler: %v", err)
|
||||||
}
|
}
|
||||||
@ -192,6 +225,13 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
|
|||||||
return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), nil, true, nil)
|
return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), nil, true, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initTestDisablePreemption initializes a test environment and creates master and scheduler with default
|
||||||
|
// configuration but with pod preemption disabled.
|
||||||
|
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
|
||||||
|
return initTestSchedulerWithOptions(
|
||||||
|
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true)
|
||||||
|
}
|
||||||
|
|
||||||
// cleanupTest deletes the scheduler and the test namespace. It should be called
|
// cleanupTest deletes the scheduler and the test namespace. It should be called
|
||||||
// at the end of a test.
|
// at the end of a test.
|
||||||
func cleanupTest(t *testing.T, context *TestContext) {
|
func cleanupTest(t *testing.T, context *TestContext) {
|
||||||
@ -205,7 +245,8 @@ func cleanupTest(t *testing.T, context *TestContext) {
|
|||||||
|
|
||||||
// waitForReflection waits till the passFunc confirms that the object it expects
|
// waitForReflection waits till the passFunc confirms that the object it expects
|
||||||
// to see is in the store. Used to observe reflected events.
|
// to see is in the store. Used to observe reflected events.
|
||||||
func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, passFunc func(n interface{}) bool) error {
|
func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
|
||||||
|
passFunc func(n interface{}) bool) error {
|
||||||
nodes := []*v1.Node{}
|
nodes := []*v1.Node{}
|
||||||
err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
n, err := nodeLister.Get(key)
|
n, err := nodeLister.Get(key)
|
||||||
@ -344,7 +385,8 @@ func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
|
|||||||
// createPausePodWithResource creates a pod with "Pause" image and the given
|
// createPausePodWithResource creates a pod with "Pause" image and the given
|
||||||
// resources and returns its pointer and error status. The resource list can be
|
// resources and returns its pointer and error status. The resource list can be
|
||||||
// nil.
|
// nil.
|
||||||
func createPausePodWithResource(cs clientset.Interface, podName string, nsName string, res *v1.ResourceList) (*v1.Pod, error) {
|
func createPausePodWithResource(cs clientset.Interface, podName string,
|
||||||
|
nsName string, res *v1.ResourceList) (*v1.Pod, error) {
|
||||||
var conf pausePodConfig
|
var conf pausePodConfig
|
||||||
if res == nil {
|
if res == nil {
|
||||||
conf = pausePodConfig{
|
conf = pausePodConfig{
|
||||||
@ -438,7 +480,8 @@ func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
|
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
|
||||||
return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable, nil
|
return cond != nil && cond.Status == v1.ConditionFalse &&
|
||||||
|
cond.Reason == v1.PodReasonUnschedulable, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -480,7 +523,8 @@ func cleanupPods(cs clientset.Interface, t *testing.T, pods []*v1.Pod) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, p := range pods {
|
for _, p := range pods {
|
||||||
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, podDeleted(cs, p.Namespace, p.Name)); err != nil {
|
if err := wait.Poll(time.Second, wait.ForeverTestTimeout,
|
||||||
|
podDeleted(cs, p.Namespace, p.Name)); err != nil {
|
||||||
t.Errorf("error while waiting for pod %v/%v to get deleted: %v", p.Namespace, p.Name, err)
|
t.Errorf("error while waiting for pod %v/%v to get deleted: %v", p.Namespace, p.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -112,5 +112,6 @@ func createSchedulerConfigurator(
|
|||||||
informerFactory.Storage().V1().StorageClasses(),
|
informerFactory.Storage().V1().StorageClasses(),
|
||||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user