mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Update correct resource version used, watch retry function to not close
This commit is contained in:
parent
250bb35041
commit
1db0ca74a9
@ -1297,12 +1297,14 @@ func taintExists(taints []v1.Taint, taintToFind *v1.Taint) bool {
|
|||||||
// expectedWatchEvents array of events which are expected to occur
|
// expectedWatchEvents array of events which are expected to occur
|
||||||
// scenario the function to run
|
// scenario the function to run
|
||||||
func WatchEventSequenceVerifier(testContext context.Context, dc dynamic.Interface, resourceType schema.GroupVersionResource, namespace string, resourceName string, listOptions metav1.ListOptions, expectedWatchEvents []watch.Event, scenario func(*watchtools.RetryWatcher) []watch.Event) {
|
func WatchEventSequenceVerifier(testContext context.Context, dc dynamic.Interface, resourceType schema.GroupVersionResource, namespace string, resourceName string, listOptions metav1.ListOptions, expectedWatchEvents []watch.Event, scenario func(*watchtools.RetryWatcher) []watch.Event) {
|
||||||
|
initResource, err := dc.Resource(resourceType).Namespace(namespace).List(testContext, listOptions)
|
||||||
|
ExpectNoError(err, "Failed to fetch inital resource")
|
||||||
listWatcher := &cache.ListWatch{
|
listWatcher := &cache.ListWatch{
|
||||||
WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
|
WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
|
||||||
return dc.Resource(resourceType).Namespace(namespace).Watch(testContext, listOptions)
|
return dc.Resource(resourceType).Namespace(namespace).Watch(testContext, listOptions)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
resourceWatch, err := watchtools.NewRetryWatcher("1", listWatcher)
|
resourceWatch, err := watchtools.NewRetryWatcher(initResource.GetResourceVersion(), listWatcher)
|
||||||
ExpectNoError(err, "Failed to create a resource watch of %v in namespace %v", resourceType.Resource, namespace)
|
ExpectNoError(err, "Failed to create a resource watch of %v in namespace %v", resourceType.Resource, namespace)
|
||||||
|
|
||||||
// NOTE value of 3 retries seems to make sense
|
// NOTE value of 3 retries seems to make sense
|
||||||
@ -1334,12 +1336,51 @@ retriesLoop:
|
|||||||
if actualWatchEventsHasDelete == false {
|
if actualWatchEventsHasDelete == false {
|
||||||
_ = dc.Resource(resourceType).Namespace(namespace).DeleteCollection(testContext, metav1.DeleteOptions{}, listOptions)
|
_ = dc.Resource(resourceType).Namespace(namespace).DeleteCollection(testContext, metav1.DeleteOptions{}, listOptions)
|
||||||
}
|
}
|
||||||
// TODO restructure failures handling
|
|
||||||
if errs.Len() > 0 && try < retries {
|
if errs.Len() > 0 && try < retries {
|
||||||
fmt.Println(fmt.Errorf("invariants violated:\n* %s", strings.Join(errs.List(), "\n* ")))
|
fmt.Println("invariants violated:\n", strings.Join(errs.List(), "\n - "))
|
||||||
continue retriesLoop
|
continue retriesLoop
|
||||||
}
|
}
|
||||||
ExpectEqual(errs.Len() > 0, false, fmt.Errorf("invariants violated:\n* %s", strings.Join(errs.List(), "\n* ")))
|
ExpectEqual(errs.Len() > 0, false, strings.Join(errs.List(), "\n - "))
|
||||||
ExpectEqual(totalValidWatchEvents, len(expectedWatchEvents), "Error: there must be an equal amount of total valid watch events (%v) and expected watch events (%v)", totalValidWatchEvents, len(expectedWatchEvents))
|
ExpectEqual(totalValidWatchEvents, len(expectedWatchEvents), "Error: there must be an equal amount of total valid watch events (%v) and expected watch events (%v)", totalValidWatchEvents, len(expectedWatchEvents))
|
||||||
|
break retriesLoop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WatchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
|
||||||
|
ch := watcher.ResultChan()
|
||||||
|
var lastEvent *watch.Event
|
||||||
|
for _, condition := range conditions {
|
||||||
|
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||||
|
if lastEvent != nil {
|
||||||
|
done, err := condition(*lastEvent)
|
||||||
|
if err != nil {
|
||||||
|
return lastEvent, err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ConditionSucceeded:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return lastEvent, watchtools.ErrWatchClosed
|
||||||
|
}
|
||||||
|
lastEvent = &event
|
||||||
|
|
||||||
|
done, err := condition(event)
|
||||||
|
if err != nil {
|
||||||
|
return lastEvent, err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
break ConditionSucceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
return lastEvent, wait.ErrWaitTimeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lastEvent, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user