diff --git a/test/e2e/apps/rc.go b/test/e2e/apps/rc.go index 08573c293a0..33526c05a29 100644 --- a/test/e2e/apps/rc.go +++ b/test/e2e/apps/rc.go @@ -147,7 +147,7 @@ var _ = SIGDescribe("ReplicationController", func() { eventFound := false ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { if watchEvent.Type != watch.Added { return false, nil } @@ -162,7 +162,7 @@ var _ = SIGDescribe("ReplicationController", func() { eventFound = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { var rc *v1.ReplicationController rcBytes, err := json.Marshal(watchEvent.Object) if err != nil { @@ -196,7 +196,7 @@ var _ = SIGDescribe("ReplicationController", func() { eventFound = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { if watchEvent.Type != watch.Modified { return false, nil } @@ -224,7 +224,7 @@ var _ = SIGDescribe("ReplicationController", func() { eventFound = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { if watchEvent.Type != watch.Modified { return false, nil } @@ -236,7 +236,7 @@ var _ = SIGDescribe("ReplicationController", func() { framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added) ginkgo.By("waiting for available Replicas") - _, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) { var rc *v1.ReplicationController rcBytes, err := json.Marshal(watchEvent.Object) if err != nil { @@ -278,7 +278,7 @@ var _ = SIGDescribe("ReplicationController", func() { eventFound = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { if watchEvent.Type != watch.Modified { return false, nil } @@ -291,7 +291,7 @@ var _ = SIGDescribe("ReplicationController", func() { ginkgo.By("waiting for ReplicationController's scale to be the max amount") eventFound = false - _, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) { var rc *v1.ReplicationController rcBytes, err := json.Marshal(watchEvent.Object) if err != nil { @@ -329,7 +329,7 @@ var _ = SIGDescribe("ReplicationController", func() { eventFound = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { if watchEvent.Type != watch.Modified { return false, nil } @@ -366,7 +366,7 @@ var _ = SIGDescribe("ReplicationController", func() { eventFound = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + _, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { if watchEvent.Type != watch.Deleted { return false, nil } @@ -676,3 +676,50 @@ func updateReplicationControllerWithRetries(c clientset.Interface, namespace, na } return rc, pollErr } + +// watchUntilWithoutRetry ... +// reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// Waits until context deadline or until context is canceled. +// +// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection +func watchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { + ch := watcher.ResultChan() + var lastEvent *watch.Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, watchtools.ErrWatchClosed + } + lastEvent = &event + + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-ctx.Done(): + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 1a777315063..44c5d7623e3 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -1375,50 +1375,3 @@ retriesLoop: break retriesLoop } } - -// WatchUntilWithoutRetry ... -// reads items from the watch until each provided condition succeeds, and then returns the last watch -// encountered. The first condition that returns an error terminates the watch (and the event is also returned). -// If no event has been received, the returned event will be nil. -// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. -// Waits until context deadline or until context is canceled. -// -// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection -func WatchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { - ch := watcher.ResultChan() - var lastEvent *watch.Event - for _, condition := range conditions { - // check the next condition against the previous event and short circuit waiting for the next watch - if lastEvent != nil { - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - continue - } - } - ConditionSucceeded: - for { - select { - case event, ok := <-ch: - if !ok { - return lastEvent, watchtools.ErrWatchClosed - } - lastEvent = &event - - done, err := condition(event) - if err != nil { - return lastEvent, err - } - if done { - break ConditionSucceeded - } - - case <-ctx.Done(): - return lastEvent, wait.ErrWaitTimeout - } - } - } - return lastEvent, nil -}