mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Move ListWatchUntil to its kin
This commit is contained in:
parent
a8fbaf95f9
commit
07b8373ab3
@ -20,15 +20,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/pager"
|
"k8s.io/client-go/tools/pager"
|
||||||
watchtools "k8s.io/client-go/tools/watch"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
||||||
@ -113,78 +110,3 @@ func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
|
|||||||
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
return lw.WatchFunc(options)
|
return lw.WatchFunc(options)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
|
||||||
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
|
||||||
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
|
||||||
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
|
|
||||||
if len(conditions) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
list, err := lw.List(metav1.ListOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
initialItems, err := meta.ExtractList(list)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// use the initial items as simulated "adds"
|
|
||||||
var lastEvent *watch.Event
|
|
||||||
currIndex := 0
|
|
||||||
passedConditions := 0
|
|
||||||
for _, condition := range conditions {
|
|
||||||
// check the next condition against the previous event and short circuit waiting for the next watch
|
|
||||||
if lastEvent != nil {
|
|
||||||
done, err := condition(*lastEvent)
|
|
||||||
if err != nil {
|
|
||||||
return lastEvent, err
|
|
||||||
}
|
|
||||||
if done {
|
|
||||||
passedConditions = passedConditions + 1
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ConditionSucceeded:
|
|
||||||
for currIndex < len(initialItems) {
|
|
||||||
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
|
|
||||||
currIndex++
|
|
||||||
|
|
||||||
done, err := condition(*lastEvent)
|
|
||||||
if err != nil {
|
|
||||||
return lastEvent, err
|
|
||||||
}
|
|
||||||
if done {
|
|
||||||
passedConditions = passedConditions + 1
|
|
||||||
break ConditionSucceeded
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if passedConditions == len(conditions) {
|
|
||||||
return lastEvent, nil
|
|
||||||
}
|
|
||||||
remainingConditions := conditions[passedConditions:]
|
|
||||||
|
|
||||||
metaObj, err := meta.ListAccessor(list)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
currResourceVersion := metaObj.GetResourceVersion()
|
|
||||||
|
|
||||||
watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
|
||||||
defer cancel()
|
|
||||||
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
|
|
||||||
if err == watchtools.ErrWatchClosed {
|
|
||||||
// present a consistent error interface to callers
|
|
||||||
err = wait.ErrWaitTimeout
|
|
||||||
}
|
|
||||||
return evt, err
|
|
||||||
}
|
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
@ -100,3 +101,78 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (
|
|||||||
|
|
||||||
return context.WithTimeout(parent, timeout)
|
return context.WithTimeout(parent, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
||||||
|
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
||||||
|
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
||||||
|
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
|
||||||
|
if len(conditions) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
list, err := lw.List(metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
initialItems, err := meta.ExtractList(list)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// use the initial items as simulated "adds"
|
||||||
|
var lastEvent *watch.Event
|
||||||
|
currIndex := 0
|
||||||
|
passedConditions := 0
|
||||||
|
for _, condition := range conditions {
|
||||||
|
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||||
|
if lastEvent != nil {
|
||||||
|
done, err := condition(*lastEvent)
|
||||||
|
if err != nil {
|
||||||
|
return lastEvent, err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
passedConditions = passedConditions + 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ConditionSucceeded:
|
||||||
|
for currIndex < len(initialItems) {
|
||||||
|
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
|
||||||
|
currIndex++
|
||||||
|
|
||||||
|
done, err := condition(*lastEvent)
|
||||||
|
if err != nil {
|
||||||
|
return lastEvent, err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
passedConditions = passedConditions + 1
|
||||||
|
break ConditionSucceeded
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if passedConditions == len(conditions) {
|
||||||
|
return lastEvent, nil
|
||||||
|
}
|
||||||
|
remainingConditions := conditions[passedConditions:]
|
||||||
|
|
||||||
|
metaObj, err := meta.ListAccessor(list)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
currResourceVersion := metaObj.GetResourceVersion()
|
||||||
|
|
||||||
|
watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
|
||||||
|
if err == watchtools.ErrWatchClosed {
|
||||||
|
// present a consistent error interface to callers
|
||||||
|
err = wait.ErrWaitTimeout
|
||||||
|
}
|
||||||
|
return evt, err
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user