From c2c3c478cdc07688803a706e26cbec14955e0c0f Mon Sep 17 00:00:00 2001 From: tanjunchen Date: Mon, 10 Feb 2020 12:34:19 +0800 Subject: [PATCH] test/e2e/framework:move functions to test/e2e/scheduling/ --- test/e2e/framework/events/BUILD | 7 -- test/e2e/framework/events/events.go | 127 --------------------------- test/e2e/scheduling/BUILD | 2 +- test/e2e/scheduling/events.go | 131 ++++++++++++++++++++++++++++ test/e2e/scheduling/predicates.go | 9 +- test/e2e/scheduling/priorities.go | 3 +- 6 files changed, 137 insertions(+), 142 deletions(-) diff --git a/test/e2e/framework/events/BUILD b/test/e2e/framework/events/BUILD index 998dd597ec1..ebac16a84d2 100644 --- a/test/e2e/framework/events/BUILD +++ b/test/e2e/framework/events/BUILD @@ -6,16 +6,9 @@ go_library( importpath = "k8s.io/kubernetes/test/e2e/framework/events", visibility = ["//visibility:public"], deps = [ - "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//test/e2e/framework:go_default_library", - "//vendor/github.com/onsi/ginkgo:go_default_library", ], ) diff --git a/test/e2e/framework/events/events.go b/test/e2e/framework/events/events.go index 49e2dfdfda4..10612c0a937 100644 --- a/test/e2e/framework/events/events.go +++ b/test/e2e/framework/events/events.go @@ -20,140 +20,13 @@ import ( "context" "fmt" "strings" - "sync" "time" - "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/test/e2e/framework" - - "github.com/onsi/ginkgo" ) -// Action is a function to be performed by the system. -type Action func() error - -// ObserveNodeUpdateAfterAction returns true if a node update matching the predicate was emitted -// from the system after performing the supplied action. -func ObserveNodeUpdateAfterAction(c clientset.Interface, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) { - observedMatchingNode := false - nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName) - informerStartedChan := make(chan struct{}) - var informerStartedGuard sync.Once - - _, controller := cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = nodeSelector.String() - ls, err := c.CoreV1().Nodes().List(context.TODO(), options) - return ls, err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - // Signal parent goroutine that watching has begun. - defer informerStartedGuard.Do(func() { close(informerStartedChan) }) - options.FieldSelector = nodeSelector.String() - w, err := c.CoreV1().Nodes().Watch(context.TODO(), options) - return w, err - }, - }, - &v1.Node{}, - 0, - cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(oldObj, newObj interface{}) { - n, ok := newObj.(*v1.Node) - framework.ExpectEqual(ok, true) - if nodePredicate(n) { - observedMatchingNode = true - } - }, - }, - ) - - // Start the informer and block this goroutine waiting for the started signal. - informerStopChan := make(chan struct{}) - defer func() { close(informerStopChan) }() - go controller.Run(informerStopChan) - <-informerStartedChan - - // Invoke the action function. - err := action() - if err != nil { - return false, err - } - - // Poll whether the informer has found a matching node update with a timeout. - // Wait up 2 minutes polling every second. - timeout := 2 * time.Minute - interval := 1 * time.Second - err = wait.Poll(interval, timeout, func() (bool, error) { - return observedMatchingNode, nil - }) - return err == nil, err -} - -// ObserveEventAfterAction returns true if an event matching the predicate was emitted -// from the system after performing the supplied action. -func ObserveEventAfterAction(c clientset.Interface, ns string, eventPredicate func(*v1.Event) bool, action Action) (bool, error) { - observedMatchingEvent := false - informerStartedChan := make(chan struct{}) - var informerStartedGuard sync.Once - - // Create an informer to list/watch events from the test framework namespace. - _, controller := cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - ls, err := c.CoreV1().Events(ns).List(context.TODO(), options) - return ls, err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - // Signal parent goroutine that watching has begun. - defer informerStartedGuard.Do(func() { close(informerStartedChan) }) - w, err := c.CoreV1().Events(ns).Watch(context.TODO(), options) - return w, err - }, - }, - &v1.Event{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - e, ok := obj.(*v1.Event) - ginkgo.By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message)) - framework.ExpectEqual(ok, true) - if eventPredicate(e) { - observedMatchingEvent = true - } - }, - }, - ) - - // Start the informer and block this goroutine waiting for the started signal. - informerStopChan := make(chan struct{}) - defer func() { close(informerStopChan) }() - go controller.Run(informerStopChan) - <-informerStartedChan - - // Invoke the action function. - err := action() - if err != nil { - return false, err - } - - // Poll whether the informer has found a matching event with a timeout. - // Wait up 2 minutes polling every second. - timeout := 2 * time.Minute - interval := 1 * time.Second - err = wait.Poll(interval, timeout, func() (bool, error) { - return observedMatchingEvent, nil - }) - return err == nil, err -} - // WaitTimeoutForEvent waits the given timeout duration for an event to occur. func WaitTimeoutForEvent(c clientset.Interface, namespace, eventSelector, msg string, timeout time.Duration) error { interval := 2 * time.Second diff --git a/test/e2e/scheduling/BUILD b/test/e2e/scheduling/BUILD index 4e3172771a3..a7bec3e7646 100644 --- a/test/e2e/scheduling/BUILD +++ b/test/e2e/scheduling/BUILD @@ -28,6 +28,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", @@ -40,7 +41,6 @@ go_library( "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//test/e2e/framework:go_default_library", - "//test/e2e/framework/events:go_default_library", "//test/e2e/framework/gpu:go_default_library", "//test/e2e/framework/job:go_default_library", "//test/e2e/framework/kubelet:go_default_library", diff --git a/test/e2e/scheduling/events.go b/test/e2e/scheduling/events.go index df55b40588f..9cff05a88b7 100644 --- a/test/e2e/scheduling/events.go +++ b/test/e2e/scheduling/events.go @@ -17,10 +17,23 @@ limitations under the License. package scheduling import ( + "context" "fmt" "strings" + "sync" + "time" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/test/e2e/framework" + + "github.com/onsi/ginkgo" ) func scheduleSuccessEvent(ns, podName, nodeName string) func(*v1.Event) bool { @@ -39,3 +52,121 @@ func scheduleFailureEvent(podName string) func(*v1.Event) bool { e.Reason == "FailedScheduling" } } + +// Action is a function to be performed by the system. +type Action func() error + +// observeNodeUpdateAfterAction returns true if a node update matching the predicate was emitted +// from the system after performing the supplied action. +func observeNodeUpdateAfterAction(c clientset.Interface, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) { + observedMatchingNode := false + nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName) + informerStartedChan := make(chan struct{}) + var informerStartedGuard sync.Once + + _, controller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = nodeSelector.String() + ls, err := c.CoreV1().Nodes().List(context.TODO(), options) + return ls, err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Signal parent goroutine that watching has begun. + defer informerStartedGuard.Do(func() { close(informerStartedChan) }) + options.FieldSelector = nodeSelector.String() + w, err := c.CoreV1().Nodes().Watch(context.TODO(), options) + return w, err + }, + }, + &v1.Node{}, + 0, + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + n, ok := newObj.(*v1.Node) + framework.ExpectEqual(ok, true) + if nodePredicate(n) { + observedMatchingNode = true + } + }, + }, + ) + + // Start the informer and block this goroutine waiting for the started signal. + informerStopChan := make(chan struct{}) + defer func() { close(informerStopChan) }() + go controller.Run(informerStopChan) + <-informerStartedChan + + // Invoke the action function. + err := action() + if err != nil { + return false, err + } + + // Poll whether the informer has found a matching node update with a timeout. + // Wait up 2 minutes polling every second. + timeout := 2 * time.Minute + interval := 1 * time.Second + err = wait.Poll(interval, timeout, func() (bool, error) { + return observedMatchingNode, nil + }) + return err == nil, err +} + +// observeEventAfterAction returns true if an event matching the predicate was emitted +// from the system after performing the supplied action. +func observeEventAfterAction(c clientset.Interface, ns string, eventPredicate func(*v1.Event) bool, action Action) (bool, error) { + observedMatchingEvent := false + informerStartedChan := make(chan struct{}) + var informerStartedGuard sync.Once + + // Create an informer to list/watch events from the test framework namespace. + _, controller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + ls, err := c.CoreV1().Events(ns).List(context.TODO(), options) + return ls, err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Signal parent goroutine that watching has begun. + defer informerStartedGuard.Do(func() { close(informerStartedChan) }) + w, err := c.CoreV1().Events(ns).Watch(context.TODO(), options) + return w, err + }, + }, + &v1.Event{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + e, ok := obj.(*v1.Event) + ginkgo.By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message)) + framework.ExpectEqual(ok, true) + if eventPredicate(e) { + observedMatchingEvent = true + } + }, + }, + ) + + // Start the informer and block this goroutine waiting for the started signal. + informerStopChan := make(chan struct{}) + defer func() { close(informerStopChan) }() + go controller.Run(informerStopChan) + <-informerStartedChan + + // Invoke the action function. + err := action() + if err != nil { + return false, err + } + + // Poll whether the informer has found a matching event with a timeout. + // Wait up 2 minutes polling every second. + timeout := 2 * time.Minute + interval := 1 * time.Second + err = wait.Poll(interval, timeout, func() (bool, error) { + return observedMatchingEvent, nil + }) + return err == nil, err +} diff --git a/test/e2e/scheduling/predicates.go b/test/e2e/scheduling/predicates.go index edfc80c4762..faa5b1f9f01 100644 --- a/test/e2e/scheduling/predicates.go +++ b/test/e2e/scheduling/predicates.go @@ -30,7 +30,6 @@ import ( clientset "k8s.io/client-go/kubernetes" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" - e2eevents "k8s.io/kubernetes/test/e2e/framework/events" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -708,7 +707,7 @@ func getRequestedStorageEphemeralStorage(pod v1.Pod) int64 { // removeTaintFromNodeAction returns a closure that removes the given taint // from the given node upon invocation. -func removeTaintFromNodeAction(cs clientset.Interface, nodeName string, testTaint v1.Taint) e2eevents.Action { +func removeTaintFromNodeAction(cs clientset.Interface, nodeName string, testTaint v1.Taint) Action { return func() error { framework.RemoveTaintOffNode(cs, nodeName, testTaint) return nil @@ -716,7 +715,7 @@ func removeTaintFromNodeAction(cs clientset.Interface, nodeName string, testTain } // createPausePodAction returns a closure that creates a pause pod upon invocation. -func createPausePodAction(f *framework.Framework, conf pausePodConfig) e2eevents.Action { +func createPausePodAction(f *framework.Framework, conf pausePodConfig) Action { return func() error { _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(context.TODO(), initPausePod(f, conf), metav1.CreateOptions{}) return err @@ -725,12 +724,12 @@ func createPausePodAction(f *framework.Framework, conf pausePodConfig) e2eevents // WaitForSchedulerAfterAction performs the provided action and then waits for // scheduler to act on the given pod. -func WaitForSchedulerAfterAction(f *framework.Framework, action e2eevents.Action, ns, podName string, expectSuccess bool) { +func WaitForSchedulerAfterAction(f *framework.Framework, action Action, ns, podName string, expectSuccess bool) { predicate := scheduleFailureEvent(podName) if expectSuccess { predicate = scheduleSuccessEvent(ns, podName, "" /* any node */) } - success, err := e2eevents.ObserveEventAfterAction(f.ClientSet, f.Namespace.Name, predicate, action) + success, err := observeEventAfterAction(f.ClientSet, f.Namespace.Name, predicate, action) framework.ExpectNoError(err) framework.ExpectEqual(success, true) } diff --git a/test/e2e/scheduling/priorities.go b/test/e2e/scheduling/priorities.go index 629af641fff..6b82c86ee44 100644 --- a/test/e2e/scheduling/priorities.go +++ b/test/e2e/scheduling/priorities.go @@ -38,7 +38,6 @@ import ( v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/test/e2e/framework" - e2eevents "k8s.io/kubernetes/test/e2e/framework/events" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2erc "k8s.io/kubernetes/test/e2e/framework/rc" @@ -285,7 +284,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() { } return node.Annotations[v1.PreferAvoidPodsAnnotationKey] == string(val) } - success, err := e2eevents.ObserveNodeUpdateAfterAction(f.ClientSet, nodeName, predicate, action) + success, err := observeNodeUpdateAfterAction(f.ClientSet, nodeName, predicate, action) framework.ExpectNoError(err) framework.ExpectEqual(success, true)