Move WatchUntilWithoutRetry() from e2e framework

WatchUntilWithoutRetry() is called from e2e/apps/rc.go only.
So this moves the function to reduce e2e/framework/util.go code.
This commit is contained in:
Kenichi Omichi 2020-07-13 22:54:41 +00:00
parent 240a72b5c0
commit c86caa608d
2 changed files with 56 additions and 56 deletions

View File

@ -147,7 +147,7 @@ var _ = SIGDescribe("ReplicationController", func() {
eventFound := false
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Added {
return false, nil
}
@ -162,7 +162,7 @@ var _ = SIGDescribe("ReplicationController", func() {
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
var rc *v1.ReplicationController
rcBytes, err := json.Marshal(watchEvent.Object)
if err != nil {
@ -196,7 +196,7 @@ var _ = SIGDescribe("ReplicationController", func() {
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Modified {
return false, nil
}
@ -224,7 +224,7 @@ var _ = SIGDescribe("ReplicationController", func() {
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Modified {
return false, nil
}
@ -236,7 +236,7 @@ var _ = SIGDescribe("ReplicationController", func() {
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
ginkgo.By("waiting for available Replicas")
_, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
var rc *v1.ReplicationController
rcBytes, err := json.Marshal(watchEvent.Object)
if err != nil {
@ -278,7 +278,7 @@ var _ = SIGDescribe("ReplicationController", func() {
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Modified {
return false, nil
}
@ -291,7 +291,7 @@ var _ = SIGDescribe("ReplicationController", func() {
ginkgo.By("waiting for ReplicationController's scale to be the max amount")
eventFound = false
_, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
var rc *v1.ReplicationController
rcBytes, err := json.Marshal(watchEvent.Object)
if err != nil {
@ -329,7 +329,7 @@ var _ = SIGDescribe("ReplicationController", func() {
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Modified {
return false, nil
}
@ -366,7 +366,7 @@ var _ = SIGDescribe("ReplicationController", func() {
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Deleted {
return false, nil
}
@ -676,3 +676,50 @@ func updateReplicationControllerWithRetries(c clientset.Interface, namespace, na
}
return rc, pollErr
}
// watchUntilWithoutRetry ...
// reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
// Waits until context deadline or until context is canceled.
//
// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection
func watchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
ch := watcher.ResultChan()
var lastEvent *watch.Event
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
continue
}
}
ConditionSucceeded:
for {
select {
case event, ok := <-ch:
if !ok {
return lastEvent, watchtools.ErrWatchClosed
}
lastEvent = &event
done, err := condition(event)
if err != nil {
return lastEvent, err
}
if done {
break ConditionSucceeded
}
case <-ctx.Done():
return lastEvent, wait.ErrWaitTimeout
}
}
}
return lastEvent, nil
}

View File

@ -1375,50 +1375,3 @@ retriesLoop:
break retriesLoop
}
}
// WatchUntilWithoutRetry ...
// reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
// Waits until context deadline or until context is canceled.
//
// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection
func WatchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
ch := watcher.ResultChan()
var lastEvent *watch.Event
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
continue
}
}
ConditionSucceeded:
for {
select {
case event, ok := <-ch:
if !ok {
return lastEvent, watchtools.ErrWatchClosed
}
lastEvent = &event
done, err := condition(event)
if err != nil {
return lastEvent, err
}
if done {
break ConditionSucceeded
}
case <-ctx.Done():
return lastEvent, wait.ErrWaitTimeout
}
}
}
return lastEvent, nil
}