mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #93046 from oomichi/WatchUntilWithoutRetry
Move WatchUntilWithoutRetry() from e2e framework
This commit is contained in:
commit
a04f1b8d59
@ -147,7 +147,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
eventFound := false
|
eventFound := false
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
if watchEvent.Type != watch.Added {
|
if watchEvent.Type != watch.Added {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -162,7 +162,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
eventFound = false
|
eventFound = false
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
var rc *v1.ReplicationController
|
var rc *v1.ReplicationController
|
||||||
rcBytes, err := json.Marshal(watchEvent.Object)
|
rcBytes, err := json.Marshal(watchEvent.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -196,7 +196,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
eventFound = false
|
eventFound = false
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
if watchEvent.Type != watch.Modified {
|
if watchEvent.Type != watch.Modified {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -224,7 +224,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
eventFound = false
|
eventFound = false
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
if watchEvent.Type != watch.Modified {
|
if watchEvent.Type != watch.Modified {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -236,7 +236,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
|
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
|
||||||
|
|
||||||
ginkgo.By("waiting for available Replicas")
|
ginkgo.By("waiting for available Replicas")
|
||||||
_, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
var rc *v1.ReplicationController
|
var rc *v1.ReplicationController
|
||||||
rcBytes, err := json.Marshal(watchEvent.Object)
|
rcBytes, err := json.Marshal(watchEvent.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -278,7 +278,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
eventFound = false
|
eventFound = false
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
if watchEvent.Type != watch.Modified {
|
if watchEvent.Type != watch.Modified {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -291,7 +291,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
|
|
||||||
ginkgo.By("waiting for ReplicationController's scale to be the max amount")
|
ginkgo.By("waiting for ReplicationController's scale to be the max amount")
|
||||||
eventFound = false
|
eventFound = false
|
||||||
_, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
var rc *v1.ReplicationController
|
var rc *v1.ReplicationController
|
||||||
rcBytes, err := json.Marshal(watchEvent.Object)
|
rcBytes, err := json.Marshal(watchEvent.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -329,7 +329,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
eventFound = false
|
eventFound = false
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
if watchEvent.Type != watch.Modified {
|
if watchEvent.Type != watch.Modified {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -366,7 +366,7 @@ var _ = SIGDescribe("ReplicationController", func() {
|
|||||||
eventFound = false
|
eventFound = false
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
|
||||||
if watchEvent.Type != watch.Deleted {
|
if watchEvent.Type != watch.Deleted {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -676,3 +676,50 @@ func updateReplicationControllerWithRetries(c clientset.Interface, namespace, na
|
|||||||
}
|
}
|
||||||
return rc, pollErr
|
return rc, pollErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// watchUntilWithoutRetry ...
|
||||||
|
// reads items from the watch until each provided condition succeeds, and then returns the last watch
|
||||||
|
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
||||||
|
// If no event has been received, the returned event will be nil.
|
||||||
|
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
||||||
|
// Waits until context deadline or until context is canceled.
|
||||||
|
//
|
||||||
|
// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection
|
||||||
|
func watchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
|
||||||
|
ch := watcher.ResultChan()
|
||||||
|
var lastEvent *watch.Event
|
||||||
|
for _, condition := range conditions {
|
||||||
|
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||||
|
if lastEvent != nil {
|
||||||
|
done, err := condition(*lastEvent)
|
||||||
|
if err != nil {
|
||||||
|
return lastEvent, err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ConditionSucceeded:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return lastEvent, watchtools.ErrWatchClosed
|
||||||
|
}
|
||||||
|
lastEvent = &event
|
||||||
|
|
||||||
|
done, err := condition(event)
|
||||||
|
if err != nil {
|
||||||
|
return lastEvent, err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
break ConditionSucceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
return lastEvent, wait.ErrWaitTimeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lastEvent, nil
|
||||||
|
}
|
||||||
|
@ -1383,50 +1383,3 @@ retriesLoop:
|
|||||||
break retriesLoop
|
break retriesLoop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchUntilWithoutRetry ...
|
|
||||||
// reads items from the watch until each provided condition succeeds, and then returns the last watch
|
|
||||||
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
|
||||||
// If no event has been received, the returned event will be nil.
|
|
||||||
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
|
||||||
// Waits until context deadline or until context is canceled.
|
|
||||||
//
|
|
||||||
// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection
|
|
||||||
func WatchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
|
|
||||||
ch := watcher.ResultChan()
|
|
||||||
var lastEvent *watch.Event
|
|
||||||
for _, condition := range conditions {
|
|
||||||
// check the next condition against the previous event and short circuit waiting for the next watch
|
|
||||||
if lastEvent != nil {
|
|
||||||
done, err := condition(*lastEvent)
|
|
||||||
if err != nil {
|
|
||||||
return lastEvent, err
|
|
||||||
}
|
|
||||||
if done {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ConditionSucceeded:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event, ok := <-ch:
|
|
||||||
if !ok {
|
|
||||||
return lastEvent, watchtools.ErrWatchClosed
|
|
||||||
}
|
|
||||||
lastEvent = &event
|
|
||||||
|
|
||||||
done, err := condition(event)
|
|
||||||
if err != nil {
|
|
||||||
return lastEvent, err
|
|
||||||
}
|
|
||||||
if done {
|
|
||||||
break ConditionSucceeded
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
|
||||||
return lastEvent, wait.ErrWaitTimeout
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return lastEvent, nil
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user