mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-29 21:29:24 +00:00
Merge pull request #63223 from k82cn/kep548_working
Automatic merge from submit-queue (batch tested with PRs 64057, 63223, 64346, 64562, 64408). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Schedule DaemonSet Pods in scheduler. Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com> **What this PR does / why we need it**: **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: part of #59194 **Special notes for your reviewer**: **Release note**: ```release-note `ScheduleDaemonSetPods` is an alpha feature (since v1.11) that causes DaemonSet Pods to be scheduler by default scheduler, instead of Daemonset controller. When it is enabled, the `NodeAffinity` term (instead of `.spec.nodeName`) is added to the DaemonSet Pods; this enables the default scheduler to bind the Pod to the target host. If node affinity of DaemonSet Pod already exists, it will be replaced. DaemonSet controller will only perform these operations when creating DaemonSet Pods; and those operations will only modify the Pods of DaemonSet, no changes are made to the `.spec.template` of DaemonSet. ```
This commit is contained in:
@@ -14,23 +14,32 @@ go_test(
|
||||
],
|
||||
tags = ["integration"],
|
||||
deps = [
|
||||
"//pkg/api/legacyscheme:go_default_library",
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/controller/daemon:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler:go_default_library",
|
||||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/algorithmprovider:go_default_library",
|
||||
"//pkg/scheduler/factory:go_default_library",
|
||||
"//pkg/util/metrics:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
"//vendor/k8s.io/api/apps/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -24,19 +24,30 @@ import (
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
appstyped "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corev1typed "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/daemon"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
@@ -69,6 +80,62 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS
|
||||
return server, closeFn, dc, informers, clientSet
|
||||
}
|
||||
|
||||
func setupScheduler(
|
||||
t *testing.T,
|
||||
cs clientset.Interface,
|
||||
informerFactory informers.SharedInformerFactory,
|
||||
stopCh chan struct{},
|
||||
) {
|
||||
// If ScheduleDaemonSetPods is disabled, do not start scheduler.
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
|
||||
return
|
||||
}
|
||||
|
||||
schedulerConfigFactory := factory.NewConfigFactory(
|
||||
v1.DefaultSchedulerName,
|
||||
cs,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create scheduler config: %v", err)
|
||||
}
|
||||
|
||||
schedulerConfig.StopEverything = stopCh
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
|
||||
legacyscheme.Scheme,
|
||||
v1.EventSource{Component: v1.DefaultSchedulerName},
|
||||
)
|
||||
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
|
||||
Interface: cs.CoreV1().Events(""),
|
||||
})
|
||||
|
||||
sched, err := scheduler.NewFromConfigurator(
|
||||
&scheduler.FakeConfigurator{Config: schedulerConfig}, nil...)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating scheduler: %v", err)
|
||||
}
|
||||
|
||||
algorithmprovider.ApplyFeatureGates()
|
||||
|
||||
go sched.Run()
|
||||
}
|
||||
|
||||
func testLabels() map[string]string {
|
||||
return map[string]string{"name": "test"}
|
||||
}
|
||||
@@ -162,6 +229,12 @@ func updateStrategies() []*apps.DaemonSetUpdateStrategy {
|
||||
return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()}
|
||||
}
|
||||
|
||||
func featureGates() []utilfeature.Feature {
|
||||
return []utilfeature.Feature{
|
||||
features.ScheduleDaemonSetPods,
|
||||
}
|
||||
}
|
||||
|
||||
func allocatableResources(memory, cpu string) v1.ResourceList {
|
||||
return v1.ResourceList{
|
||||
v1.ResourceMemory: resource.MustParse(memory),
|
||||
@@ -242,7 +315,7 @@ func validateDaemonSetPodsAndMarkReady(
|
||||
t.Errorf("controllerRef.Controller is not set to true")
|
||||
}
|
||||
|
||||
if !podutil.IsPodReady(pod) {
|
||||
if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 {
|
||||
podCopy := pod.DeepCopy()
|
||||
podCopy.Status = v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
@@ -261,6 +334,44 @@ func validateDaemonSetPodsAndMarkReady(
|
||||
}
|
||||
}
|
||||
|
||||
// podUnschedulable returns a condition function that returns true if the given pod
|
||||
// gets unschedulable status.
|
||||
func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
// This could be a connection error so we want to retry.
|
||||
return false, nil
|
||||
}
|
||||
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
|
||||
return cond != nil && cond.Status == v1.ConditionFalse &&
|
||||
cond.Reason == v1.PodReasonUnschedulable, nil
|
||||
}
|
||||
}
|
||||
|
||||
// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
|
||||
// an error if it does not become unschedulable within the given timeout.
|
||||
func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
|
||||
return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
|
||||
}
|
||||
|
||||
// waitForPodUnschedule waits for a pod to fail scheduling and returns
|
||||
// an error if it does not become unschedulable within the timeout duration (30 seconds).
|
||||
func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
|
||||
return waitForPodUnschedulableWithTimeout(cs, pod, 10*time.Second)
|
||||
}
|
||||
|
||||
// waitForPodsCreated waits for number of pods are created.
|
||||
func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error {
|
||||
return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
|
||||
objects := podInformer.GetIndexer().List()
|
||||
return len(objects) == num, nil
|
||||
})
|
||||
}
|
||||
|
||||
func validateDaemonSetStatus(
|
||||
dsClient appstyped.DaemonSetInterface,
|
||||
dsName string,
|
||||
@@ -302,6 +413,22 @@ func validateFailedPlacementEvent(eventClient corev1typed.EventInterface, t *tes
|
||||
}
|
||||
}
|
||||
|
||||
func forEachFeatureGate(t *testing.T, tf func(t *testing.T)) {
|
||||
for _, fg := range featureGates() {
|
||||
func() {
|
||||
enabled := utilfeature.DefaultFeatureGate.Enabled(fg)
|
||||
defer func() {
|
||||
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, enabled))
|
||||
}()
|
||||
|
||||
for _, f := range []bool{true, false} {
|
||||
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, f))
|
||||
t.Run(fmt.Sprintf("%v (%t)", fg, f), tf)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy),
|
||||
@@ -310,69 +437,152 @@ func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSe
|
||||
}
|
||||
|
||||
func TestOneNodeDaemonLaunchesPod(t *testing.T) {
|
||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||
server, closeFn, dc, informers, clientset := setup(t)
|
||||
defer closeFn()
|
||||
ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
forEachFeatureGate(t, func(t *testing.T) {
|
||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||
server, closeFn, dc, informers, clientset := setup(t)
|
||||
defer closeFn()
|
||||
ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
|
||||
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
|
||||
podClient := clientset.CoreV1().Pods(ns.Name)
|
||||
nodeClient := clientset.CoreV1().Nodes()
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
|
||||
podClient := clientset.CoreV1().Pods(ns.Name)
|
||||
nodeClient := clientset.CoreV1().Nodes()
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
_, err := dsClient.Create(ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
||||
}
|
||||
defer cleanupDaemonSets(t, clientset, ds)
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
_, err = nodeClient.Create(newNode("single-node", nil))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create node: %v", err)
|
||||
}
|
||||
// Start Scheduler
|
||||
setupScheduler(t, clientset, informers, stopCh)
|
||||
|
||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
|
||||
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
_, err := dsClient.Create(ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
||||
}
|
||||
defer cleanupDaemonSets(t, clientset, ds)
|
||||
|
||||
_, err = nodeClient.Create(newNode("single-node", nil))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create node: %v", err)
|
||||
}
|
||||
|
||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
|
||||
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
|
||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||
server, closeFn, dc, informers, clientset := setup(t)
|
||||
defer closeFn()
|
||||
ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
forEachFeatureGate(t, func(t *testing.T) {
|
||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||
server, closeFn, dc, informers, clientset := setup(t)
|
||||
defer closeFn()
|
||||
ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
|
||||
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
|
||||
podClient := clientset.CoreV1().Pods(ns.Name)
|
||||
nodeClient := clientset.CoreV1().Nodes()
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
|
||||
podClient := clientset.CoreV1().Pods(ns.Name)
|
||||
nodeClient := clientset.CoreV1().Nodes()
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
_, err := dsClient.Create(ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
||||
}
|
||||
defer cleanupDaemonSets(t, clientset, ds)
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
addNodes(nodeClient, 0, 5, nil, t)
|
||||
// Start Scheduler
|
||||
setupScheduler(t, clientset, informers, stopCh)
|
||||
|
||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t)
|
||||
validateDaemonSetStatus(dsClient, ds.Name, 5, t)
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
_, err := dsClient.Create(ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
||||
}
|
||||
defer cleanupDaemonSets(t, clientset, ds)
|
||||
|
||||
addNodes(nodeClient, 0, 5, nil, t)
|
||||
|
||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t)
|
||||
validateDaemonSetStatus(dsClient, ds.Name, 5, t)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
|
||||
forEachFeatureGate(t, func(t *testing.T) {
|
||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||
server, closeFn, dc, informers, clientset := setup(t)
|
||||
defer closeFn()
|
||||
ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
|
||||
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
|
||||
podClient := clientset.CoreV1().Pods(ns.Name)
|
||||
nodeClient := clientset.CoreV1().Nodes()
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
// Start Scheduler
|
||||
setupScheduler(t, clientset, informers, stopCh)
|
||||
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
|
||||
ds.Spec.Template.Spec.Affinity = &v1.Affinity{
|
||||
NodeAffinity: &v1.NodeAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
|
||||
NodeSelectorTerms: []v1.NodeSelectorTerm{
|
||||
{
|
||||
MatchExpressions: []v1.NodeSelectorRequirement{
|
||||
{
|
||||
Key: "zone",
|
||||
Operator: v1.NodeSelectorOpIn,
|
||||
Values: []string{"test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
MatchFields: []v1.NodeSelectorRequirement{
|
||||
{
|
||||
Key: algorithm.NodeFieldSelectorKeyNodeName,
|
||||
Operator: v1.NodeSelectorOpIn,
|
||||
Values: []string{"node-1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := dsClient.Create(ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
||||
}
|
||||
defer cleanupDaemonSets(t, clientset, ds)
|
||||
|
||||
addNodes(nodeClient, 0, 2, nil, t)
|
||||
// Two nodes with labels
|
||||
addNodes(nodeClient, 2, 2, map[string]string{
|
||||
"zone": "test",
|
||||
}, t)
|
||||
addNodes(nodeClient, 4, 2, nil, t)
|
||||
|
||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t)
|
||||
validateDaemonSetStatus(dsClient, ds.Name, 3, t)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -389,9 +599,13 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
// Start Scheduler
|
||||
setupScheduler(t, clientset, informers, stopCh)
|
||||
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
@@ -399,6 +613,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
||||
}
|
||||
|
||||
defer cleanupDaemonSets(t, clientset, ds)
|
||||
|
||||
node := newNode("single-node", nil)
|
||||
@@ -427,9 +642,10 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
|
||||
eventClient := clientset.CoreV1().Events(ns.Namespace)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.Template.Spec = resourcePodSpec("node-with-limited-memory", "120M", "75m")
|
||||
@@ -450,3 +666,77 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
|
||||
validateFailedPlacementEvent(eventClient, t)
|
||||
})
|
||||
}
|
||||
|
||||
// TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests that when "ScheduleDaemonSetPods"
|
||||
// feature is enabled, the DaemonSet should create Pods for all the nodes regardless of available resource
|
||||
// on the nodes, and kube-scheduler should not schedule Pods onto the nodes with insufficient resource.
|
||||
func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T) {
|
||||
enabled := utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods)
|
||||
defer func() {
|
||||
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t",
|
||||
features.ScheduleDaemonSetPods, enabled))
|
||||
}()
|
||||
|
||||
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.ScheduleDaemonSetPods, true))
|
||||
|
||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||
server, closeFn, dc, informers, clientset := setup(t)
|
||||
defer closeFn()
|
||||
ns := framework.CreateTestingNamespace("insufficient-capacity", server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
|
||||
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
|
||||
podClient := clientset.CoreV1().Pods(ns.Name)
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
nodeClient := clientset.CoreV1().Nodes()
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
// Start Scheduler
|
||||
setupScheduler(t, clientset, informers, stopCh)
|
||||
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds, err := dsClient.Create(ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
||||
}
|
||||
|
||||
defer cleanupDaemonSets(t, clientset, ds)
|
||||
|
||||
node := newNode("node-with-limited-memory", nil)
|
||||
node.Status.Allocatable = allocatableResources("100M", "200m")
|
||||
_, err = nodeClient.Create(node)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create node: %v", err)
|
||||
}
|
||||
|
||||
if err := waitForPodsCreated(podInformer, 1); err != nil {
|
||||
t.Errorf("Failed to wait for pods created: %v", err)
|
||||
}
|
||||
|
||||
objects := podInformer.GetIndexer().List()
|
||||
for _, object := range objects {
|
||||
pod := object.(*v1.Pod)
|
||||
if err := waitForPodUnschedulable(clientset, pod); err != nil {
|
||||
t.Errorf("Failed to wait for unschedulable status of pod %+v", pod)
|
||||
}
|
||||
}
|
||||
|
||||
node1 := newNode("node-with-enough-memory", nil)
|
||||
node1.Status.Allocatable = allocatableResources("200M", "2000m")
|
||||
_, err = nodeClient.Create(node1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create node: %v", err)
|
||||
}
|
||||
|
||||
// When ScheduleDaemonSetPods enabled, 2 pods are created. But only one
|
||||
// of two Pods is scheduled by default scheduler.
|
||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
|
||||
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user