Updated integration test.

This commit is contained in:
Da K. Ma 2018-06-02 08:39:28 +08:00
parent 8180e1e60f
commit 9fd848e5ec
2 changed files with 351 additions and 52 deletions

View File

@ -14,23 +14,32 @@ go_test(
], ],
tags = ["integration"], tags = ["integration"],
deps = [ deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/controller/daemon:go_default_library", "//pkg/controller/daemon:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/util/metrics:go_default_library", "//pkg/util/metrics:go_default_library",
"//test/integration/framework:go_default_library", "//test/integration/framework:go_default_library",
"//vendor/k8s.io/api/apps/v1:go_default_library", "//vendor/k8s.io/api/apps/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
], ],
) )

View File

@ -24,19 +24,30 @@ import (
apps "k8s.io/api/apps/v1" apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
appstyped "k8s.io/client-go/kubernetes/typed/apps/v1" appstyped "k8s.io/client-go/kubernetes/typed/apps/v1"
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
corev1typed "k8s.io/client-go/kubernetes/typed/core/v1" corev1typed "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/pkg/scheduler/factory"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
@ -69,6 +80,62 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS
return server, closeFn, dc, informers, clientSet return server, closeFn, dc, informers, clientSet
} }
func setupScheduler(
t *testing.T,
cs clientset.Interface,
informerFactory informers.SharedInformerFactory,
stopCh chan struct{},
) {
// If ScheduleDaemonSetPods is disabled, do not start scheduler.
if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
return
}
schedulerConfigFactory := factory.NewConfigFactory(
v1.DefaultSchedulerName,
cs,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
true,
false,
)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
}
schedulerConfig.StopEverything = stopCh
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.EventSource{Component: v1.DefaultSchedulerName},
)
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
Interface: cs.CoreV1().Events(""),
})
sched, err := scheduler.NewFromConfigurator(
&scheduler.FakeConfigurator{Config: schedulerConfig}, nil...)
if err != nil {
t.Fatalf("error creating scheduler: %v", err)
}
algorithmprovider.ApplyFeatureGates()
go sched.Run()
}
func testLabels() map[string]string { func testLabels() map[string]string {
return map[string]string{"name": "test"} return map[string]string{"name": "test"}
} }
@ -162,6 +229,12 @@ func updateStrategies() []*apps.DaemonSetUpdateStrategy {
return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()} return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()}
} }
func featureGates() []utilfeature.Feature {
return []utilfeature.Feature{
features.ScheduleDaemonSetPods,
}
}
func allocatableResources(memory, cpu string) v1.ResourceList { func allocatableResources(memory, cpu string) v1.ResourceList {
return v1.ResourceList{ return v1.ResourceList{
v1.ResourceMemory: resource.MustParse(memory), v1.ResourceMemory: resource.MustParse(memory),
@ -242,7 +315,7 @@ func validateDaemonSetPodsAndMarkReady(
t.Errorf("controllerRef.Controller is not set to true") t.Errorf("controllerRef.Controller is not set to true")
} }
if !podutil.IsPodReady(pod) { if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 {
podCopy := pod.DeepCopy() podCopy := pod.DeepCopy()
podCopy.Status = v1.PodStatus{ podCopy.Status = v1.PodStatus{
Phase: v1.PodRunning, Phase: v1.PodRunning,
@ -261,6 +334,44 @@ func validateDaemonSetPodsAndMarkReady(
} }
} }
// podUnschedulable returns a condition function that returns true if the given pod
// gets unschedulable status.
func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse &&
cond.Reason == v1.PodReasonUnschedulable, nil
}
}
// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the given timeout.
func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
}
// waitForPodUnschedule waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the timeout duration (30 seconds).
func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
return waitForPodUnschedulableWithTimeout(cs, pod, 10*time.Second)
}
// waitForPodsCreated waits for number of pods are created.
func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error {
return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
objects := podInformer.GetIndexer().List()
return len(objects) == num, nil
})
}
func validateDaemonSetStatus( func validateDaemonSetStatus(
dsClient appstyped.DaemonSetInterface, dsClient appstyped.DaemonSetInterface,
dsName string, dsName string,
@ -302,6 +413,22 @@ func validateFailedPlacementEvent(eventClient corev1typed.EventInterface, t *tes
} }
} }
func forEachFeatureGate(t *testing.T, tf func(t *testing.T)) {
for _, fg := range featureGates() {
func() {
enabled := utilfeature.DefaultFeatureGate.Enabled(fg)
defer func() {
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, enabled))
}()
for _, f := range []bool{true, false} {
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, f))
t.Run(fmt.Sprintf("%v (%t)", fg, f), tf)
}
}()
}
}
func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) { func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) {
for _, strategy := range updateStrategies() { for _, strategy := range updateStrategies() {
t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy), t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy),
@ -310,6 +437,7 @@ func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSe
} }
func TestOneNodeDaemonLaunchesPod(t *testing.T) { func TestOneNodeDaemonLaunchesPod(t *testing.T) {
forEachFeatureGate(t, func(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
server, closeFn, dc, informers, clientset := setup(t) server, closeFn, dc, informers, clientset := setup(t)
defer closeFn() defer closeFn()
@ -322,9 +450,13 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer() podInformer := informers.Core().V1().Pods().Informer()
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh) informers.Start(stopCh)
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
defer close(stopCh)
// Start Scheduler
setupScheduler(t, clientset, informers, stopCh)
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
@ -342,9 +474,11 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t) validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
validateDaemonSetStatus(dsClient, ds.Name, 1, t) validateDaemonSetStatus(dsClient, ds.Name, 1, t)
}) })
})
} }
func TestSimpleDaemonSetLaunchesPods(t *testing.T) { func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
forEachFeatureGate(t, func(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
server, closeFn, dc, informers, clientset := setup(t) server, closeFn, dc, informers, clientset := setup(t)
defer closeFn() defer closeFn()
@ -357,9 +491,13 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer() podInformer := informers.Core().V1().Pods().Informer()
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh) informers.Start(stopCh)
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
defer close(stopCh)
// Start Scheduler
setupScheduler(t, clientset, informers, stopCh)
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
@ -374,6 +512,78 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t) validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t)
validateDaemonSetStatus(dsClient, ds.Name, 5, t) validateDaemonSetStatus(dsClient, ds.Name, 5, t)
}) })
})
}
func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
forEachFeatureGate(t, func(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
server, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
defer framework.DeleteTestingNamespace(ns, server, t)
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
podClient := clientset.CoreV1().Pods(ns.Name)
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go dc.Run(5, stopCh)
// Start Scheduler
setupScheduler(t, clientset, informers, stopCh)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "zone",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test"},
},
},
},
{
MatchFields: []v1.NodeSelectorRequirement{
{
Key: algorithm.NodeFieldSelectorKeyNodeName,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node-1"},
},
},
},
},
},
},
}
_, err := dsClient.Create(ds)
if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}
defer cleanupDaemonSets(t, clientset, ds)
addNodes(nodeClient, 0, 2, nil, t)
// Two nodes with labels
addNodes(nodeClient, 2, 2, map[string]string{
"zone": "test",
}, t)
addNodes(nodeClient, 4, 2, nil, t)
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t)
validateDaemonSetStatus(dsClient, ds.Name, 3, t)
})
})
} }
func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
@ -389,9 +599,13 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer() podInformer := informers.Core().V1().Pods().Informer()
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh) informers.Start(stopCh)
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
defer close(stopCh)
// Start Scheduler
setupScheduler(t, clientset, informers, stopCh)
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
@ -399,6 +613,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err) t.Fatalf("Failed to create DaemonSet: %v", err)
} }
defer cleanupDaemonSets(t, clientset, ds) defer cleanupDaemonSets(t, clientset, ds)
node := newNode("single-node", nil) node := newNode("single-node", nil)
@ -427,9 +642,10 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
eventClient := clientset.CoreV1().Events(ns.Namespace) eventClient := clientset.CoreV1().Events(ns.Namespace)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh) informers.Start(stopCh)
go dc.Run(5, stopCh) go dc.Run(5, stopCh)
defer close(stopCh)
ds := newDaemonSet("foo", ns.Name) ds := newDaemonSet("foo", ns.Name)
ds.Spec.Template.Spec = resourcePodSpec("node-with-limited-memory", "120M", "75m") ds.Spec.Template.Spec = resourcePodSpec("node-with-limited-memory", "120M", "75m")
@ -450,3 +666,77 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
validateFailedPlacementEvent(eventClient, t) validateFailedPlacementEvent(eventClient, t)
}) })
} }
// TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests that when "ScheduleDaemonSetPods"
// feature is enabled, the DaemonSet should create Pods for all the nodes regardless of available resource
// on the nodes, and kube-scheduler should not schedule Pods onto the nodes with insufficient resource.
func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T) {
enabled := utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods)
defer func() {
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t",
features.ScheduleDaemonSetPods, enabled))
}()
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.ScheduleDaemonSetPods, true))
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
server, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("insufficient-capacity", server, t)
defer framework.DeleteTestingNamespace(ns, server, t)
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
podClient := clientset.CoreV1().Pods(ns.Name)
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go dc.Run(5, stopCh)
// Start Scheduler
setupScheduler(t, clientset, informers, stopCh)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m")
ds.Spec.UpdateStrategy = *strategy
ds, err := dsClient.Create(ds)
if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}
defer cleanupDaemonSets(t, clientset, ds)
node := newNode("node-with-limited-memory", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
_, err = nodeClient.Create(node)
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}
if err := waitForPodsCreated(podInformer, 1); err != nil {
t.Errorf("Failed to wait for pods created: %v", err)
}
objects := podInformer.GetIndexer().List()
for _, object := range objects {
pod := object.(*v1.Pod)
if err := waitForPodUnschedulable(clientset, pod); err != nil {
t.Errorf("Failed to wait for unschedulable status of pod %+v", pod)
}
}
node1 := newNode("node-with-enough-memory", nil)
node1.Status.Allocatable = allocatableResources("200M", "2000m")
_, err = nodeClient.Create(node1)
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}
// When ScheduleDaemonSetPods enabled, 2 pods are created. But only one
// of two Pods is scheduled by default scheduler.
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
})
}