diff --git a/test/integration/daemonset/BUILD b/test/integration/daemonset/BUILD index 138dddfb5bf..a1c60537057 100644 --- a/test/integration/daemonset/BUILD +++ b/test/integration/daemonset/BUILD @@ -14,23 +14,32 @@ go_test( ], tags = ["integration"], deps = [ + "//pkg/api/legacyscheme:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/controller/daemon:go_default_library", + "//pkg/features:go_default_library", + "//pkg/scheduler:go_default_library", + "//pkg/scheduler/algorithm:go_default_library", + "//pkg/scheduler/algorithmprovider:go_default_library", + "//pkg/scheduler/factory:go_default_library", "//pkg/util/metrics:go_default_library", "//test/integration/framework:go_default_library", "//vendor/k8s.io/api/apps/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 5c8cf8ee276..0a7a72ca16e 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -24,19 +24,30 @@ import ( apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" appstyped "k8s.io/client-go/kubernetes/typed/apps/v1" + clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" corev1typed "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/daemon" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/algorithm" + "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" + _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" + "k8s.io/kubernetes/pkg/scheduler/factory" "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/test/integration/framework" ) @@ -69,6 +80,62 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS return server, closeFn, dc, informers, clientSet } +func setupScheduler( + t *testing.T, + cs clientset.Interface, + informerFactory informers.SharedInformerFactory, + stopCh chan struct{}, +) { + // If ScheduleDaemonSetPods is disabled, do not start scheduler. + if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { + return + } + + schedulerConfigFactory := factory.NewConfigFactory( + v1.DefaultSchedulerName, + cs, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Apps().V1beta1().StatefulSets(), + informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), + v1.DefaultHardPodAffinitySymmetricWeight, + true, + false, + ) + + schedulerConfig, err := schedulerConfigFactory.Create() + if err != nil { + t.Fatalf("Couldn't create scheduler config: %v", err) + } + + schedulerConfig.StopEverything = stopCh + + eventBroadcaster := record.NewBroadcaster() + schedulerConfig.Recorder = eventBroadcaster.NewRecorder( + legacyscheme.Scheme, + v1.EventSource{Component: v1.DefaultSchedulerName}, + ) + eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{ + Interface: cs.CoreV1().Events(""), + }) + + sched, err := scheduler.NewFromConfigurator( + &scheduler.FakeConfigurator{Config: schedulerConfig}, nil...) + if err != nil { + t.Fatalf("error creating scheduler: %v", err) + } + + algorithmprovider.ApplyFeatureGates() + + go sched.Run() +} + func testLabels() map[string]string { return map[string]string{"name": "test"} } @@ -162,6 +229,12 @@ func updateStrategies() []*apps.DaemonSetUpdateStrategy { return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()} } +func featureGates() []utilfeature.Feature { + return []utilfeature.Feature{ + features.ScheduleDaemonSetPods, + } +} + func allocatableResources(memory, cpu string) v1.ResourceList { return v1.ResourceList{ v1.ResourceMemory: resource.MustParse(memory), @@ -242,7 +315,7 @@ func validateDaemonSetPodsAndMarkReady( t.Errorf("controllerRef.Controller is not set to true") } - if !podutil.IsPodReady(pod) { + if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 { podCopy := pod.DeepCopy() podCopy.Status = v1.PodStatus{ Phase: v1.PodRunning, @@ -261,6 +334,44 @@ func validateDaemonSetPodsAndMarkReady( } } +// podUnschedulable returns a condition function that returns true if the given pod +// gets unschedulable status. +func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return false, nil + } + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + return cond != nil && cond.Status == v1.ConditionFalse && + cond.Reason == v1.PodReasonUnschedulable, nil + } +} + +// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns +// an error if it does not become unschedulable within the given timeout. +func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { + return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name)) +} + +// waitForPodUnschedule waits for a pod to fail scheduling and returns +// an error if it does not become unschedulable within the timeout duration (30 seconds). +func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { + return waitForPodUnschedulableWithTimeout(cs, pod, 10*time.Second) +} + +// waitForPodsCreated waits for number of pods are created. +func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error { + return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) { + objects := podInformer.GetIndexer().List() + return len(objects) == num, nil + }) +} + func validateDaemonSetStatus( dsClient appstyped.DaemonSetInterface, dsName string, @@ -302,6 +413,22 @@ func validateFailedPlacementEvent(eventClient corev1typed.EventInterface, t *tes } } +func forEachFeatureGate(t *testing.T, tf func(t *testing.T)) { + for _, fg := range featureGates() { + func() { + enabled := utilfeature.DefaultFeatureGate.Enabled(fg) + defer func() { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, enabled)) + }() + + for _, f := range []bool{true, false} { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, f)) + t.Run(fmt.Sprintf("%v (%t)", fg, f), tf) + } + }() + } +} + func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) { for _, strategy := range updateStrategies() { t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy), @@ -310,69 +437,152 @@ func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSe } func TestOneNodeDaemonLaunchesPod(t *testing.T) { - forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - server, closeFn, dc, informers, clientset := setup(t) - defer closeFn() - ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t) - defer framework.DeleteTestingNamespace(ns, server, t) + forEachFeatureGate(t, func(t *testing.T) { + forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { + server, closeFn, dc, informers, clientset := setup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) - dsClient := clientset.AppsV1().DaemonSets(ns.Name) - podClient := clientset.CoreV1().Pods(ns.Name) - nodeClient := clientset.CoreV1().Nodes() - podInformer := informers.Core().V1().Pods().Informer() + dsClient := clientset.AppsV1().DaemonSets(ns.Name) + podClient := clientset.CoreV1().Pods(ns.Name) + nodeClient := clientset.CoreV1().Nodes() + podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - informers.Start(stopCh) - go dc.Run(5, stopCh) - defer close(stopCh) + stopCh := make(chan struct{}) + defer close(stopCh) - ds := newDaemonSet("foo", ns.Name) - ds.Spec.UpdateStrategy = *strategy - _, err := dsClient.Create(ds) - if err != nil { - t.Fatalf("Failed to create DaemonSet: %v", err) - } - defer cleanupDaemonSets(t, clientset, ds) + informers.Start(stopCh) + go dc.Run(5, stopCh) - _, err = nodeClient.Create(newNode("single-node", nil)) - if err != nil { - t.Fatalf("Failed to create node: %v", err) - } + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) - validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t) - validateDaemonSetStatus(dsClient, ds.Name, 1, t) + ds := newDaemonSet("foo", ns.Name) + ds.Spec.UpdateStrategy = *strategy + _, err := dsClient.Create(ds) + if err != nil { + t.Fatalf("Failed to create DaemonSet: %v", err) + } + defer cleanupDaemonSets(t, clientset, ds) + + _, err = nodeClient.Create(newNode("single-node", nil)) + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t) + validateDaemonSetStatus(dsClient, ds.Name, 1, t) + }) }) } func TestSimpleDaemonSetLaunchesPods(t *testing.T) { - forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - server, closeFn, dc, informers, clientset := setup(t) - defer closeFn() - ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t) - defer framework.DeleteTestingNamespace(ns, server, t) + forEachFeatureGate(t, func(t *testing.T) { + forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { + server, closeFn, dc, informers, clientset := setup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) - dsClient := clientset.AppsV1().DaemonSets(ns.Name) - podClient := clientset.CoreV1().Pods(ns.Name) - nodeClient := clientset.CoreV1().Nodes() - podInformer := informers.Core().V1().Pods().Informer() + dsClient := clientset.AppsV1().DaemonSets(ns.Name) + podClient := clientset.CoreV1().Pods(ns.Name) + nodeClient := clientset.CoreV1().Nodes() + podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - informers.Start(stopCh) - go dc.Run(5, stopCh) - defer close(stopCh) + stopCh := make(chan struct{}) + defer close(stopCh) - ds := newDaemonSet("foo", ns.Name) - ds.Spec.UpdateStrategy = *strategy - _, err := dsClient.Create(ds) - if err != nil { - t.Fatalf("Failed to create DaemonSet: %v", err) - } - defer cleanupDaemonSets(t, clientset, ds) + informers.Start(stopCh) + go dc.Run(5, stopCh) - addNodes(nodeClient, 0, 5, nil, t) + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) - validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t) - validateDaemonSetStatus(dsClient, ds.Name, 5, t) + ds := newDaemonSet("foo", ns.Name) + ds.Spec.UpdateStrategy = *strategy + _, err := dsClient.Create(ds) + if err != nil { + t.Fatalf("Failed to create DaemonSet: %v", err) + } + defer cleanupDaemonSets(t, clientset, ds) + + addNodes(nodeClient, 0, 5, nil, t) + + validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t) + validateDaemonSetStatus(dsClient, ds.Name, 5, t) + }) + }) +} + +func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) { + forEachFeatureGate(t, func(t *testing.T) { + forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { + server, closeFn, dc, informers, clientset := setup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + + dsClient := clientset.AppsV1().DaemonSets(ns.Name) + podClient := clientset.CoreV1().Pods(ns.Name) + nodeClient := clientset.CoreV1().Nodes() + podInformer := informers.Core().V1().Pods().Informer() + + stopCh := make(chan struct{}) + defer close(stopCh) + + informers.Start(stopCh) + go dc.Run(5, stopCh) + + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) + + ds := newDaemonSet("foo", ns.Name) + ds.Spec.UpdateStrategy = *strategy + + ds.Spec.Template.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "zone", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test"}, + }, + }, + }, + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node-1"}, + }, + }, + }, + }, + }, + }, + } + + _, err := dsClient.Create(ds) + if err != nil { + t.Fatalf("Failed to create DaemonSet: %v", err) + } + defer cleanupDaemonSets(t, clientset, ds) + + addNodes(nodeClient, 0, 2, nil, t) + // Two nodes with labels + addNodes(nodeClient, 2, 2, map[string]string{ + "zone": "test", + }, t) + addNodes(nodeClient, 4, 2, nil, t) + + validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t) + validateDaemonSetStatus(dsClient, ds.Name, 3, t) + }) }) } @@ -389,9 +599,13 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { podInformer := informers.Core().V1().Pods().Informer() stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) go dc.Run(5, stopCh) - defer close(stopCh) + + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -399,6 +613,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { if err != nil { t.Fatalf("Failed to create DaemonSet: %v", err) } + defer cleanupDaemonSets(t, clientset, ds) node := newNode("single-node", nil) @@ -427,9 +642,10 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { eventClient := clientset.CoreV1().Events(ns.Namespace) stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) go dc.Run(5, stopCh) - defer close(stopCh) ds := newDaemonSet("foo", ns.Name) ds.Spec.Template.Spec = resourcePodSpec("node-with-limited-memory", "120M", "75m") @@ -450,3 +666,77 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { validateFailedPlacementEvent(eventClient, t) }) } + +// TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests that when "ScheduleDaemonSetPods" +// feature is enabled, the DaemonSet should create Pods for all the nodes regardless of available resource +// on the nodes, and kube-scheduler should not schedule Pods onto the nodes with insufficient resource. +func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T) { + enabled := utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) + defer func() { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", + features.ScheduleDaemonSetPods, enabled)) + }() + + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.ScheduleDaemonSetPods, true)) + + forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { + server, closeFn, dc, informers, clientset := setup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("insufficient-capacity", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + + dsClient := clientset.AppsV1().DaemonSets(ns.Name) + podClient := clientset.CoreV1().Pods(ns.Name) + podInformer := informers.Core().V1().Pods().Informer() + nodeClient := clientset.CoreV1().Nodes() + stopCh := make(chan struct{}) + defer close(stopCh) + + informers.Start(stopCh) + go dc.Run(5, stopCh) + + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) + + ds := newDaemonSet("foo", ns.Name) + ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m") + ds.Spec.UpdateStrategy = *strategy + ds, err := dsClient.Create(ds) + if err != nil { + t.Fatalf("Failed to create DaemonSet: %v", err) + } + + defer cleanupDaemonSets(t, clientset, ds) + + node := newNode("node-with-limited-memory", nil) + node.Status.Allocatable = allocatableResources("100M", "200m") + _, err = nodeClient.Create(node) + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + if err := waitForPodsCreated(podInformer, 1); err != nil { + t.Errorf("Failed to wait for pods created: %v", err) + } + + objects := podInformer.GetIndexer().List() + for _, object := range objects { + pod := object.(*v1.Pod) + if err := waitForPodUnschedulable(clientset, pod); err != nil { + t.Errorf("Failed to wait for unschedulable status of pod %+v", pod) + } + } + + node1 := newNode("node-with-enough-memory", nil) + node1.Status.Allocatable = allocatableResources("200M", "2000m") + _, err = nodeClient.Create(node1) + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + // When ScheduleDaemonSetPods enabled, 2 pods are created. But only one + // of two Pods is scheduled by default scheduler. + validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t) + validateDaemonSetStatus(dsClient, ds.Name, 1, t) + }) +}