mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-27 23:48:30 +00:00
Rename Until to UntilWithoutRetry and move to using context so it's
cancelable Kubernetes-commit: 3d4a02abb54244861f9f05b8db2fdfdaa2c6f67c
This commit is contained in:
parent
5764fa1b79
commit
cbdb98d74d
9
tools/cache/listwatch.go
vendored
9
tools/cache/listwatch.go
vendored
@ -28,6 +28,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/pager"
|
"k8s.io/client-go/tools/pager"
|
||||||
|
watchtools "k8s.io/client-go/tools/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
||||||
@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
|
|||||||
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
||||||
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
||||||
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
||||||
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
|
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
|
||||||
if len(conditions) == 0 {
|
if len(conditions) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
|
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||||
if err == watch.ErrWatchClosed {
|
defer cancel()
|
||||||
|
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
|
||||||
|
if err == watchtools.ErrWatchClosed {
|
||||||
// present a consistent error interface to callers
|
// present a consistent error interface to callers
|
||||||
err = wait.ErrWaitTimeout
|
err = wait.ErrWaitTimeout
|
||||||
}
|
}
|
||||||
|
@ -17,38 +17,39 @@ limitations under the License.
|
|||||||
package watch
|
package watch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
|
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
|
||||||
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
|
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
|
||||||
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
|
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
|
||||||
// from false to true).
|
// from false to true).
|
||||||
type ConditionFunc func(event Event) (bool, error)
|
type ConditionFunc func(event watch.Event) (bool, error)
|
||||||
|
|
||||||
// ErrWatchClosed is returned when the watch channel is closed before timeout in Until.
|
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
|
||||||
var ErrWatchClosed = errors.New("watch closed before Until timeout")
|
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")
|
||||||
|
|
||||||
// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
|
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
|
||||||
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
||||||
// If no event has been received, the returned event will be nil.
|
// If no event has been received, the returned event will be nil.
|
||||||
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
||||||
// A zero timeout means to wait forever.
|
// Waits until context deadline or until context is canceled.
|
||||||
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
|
//
|
||||||
|
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
|
||||||
|
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
|
||||||
|
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
|
||||||
|
// Warning: solving such issues.
|
||||||
|
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
|
||||||
|
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
|
||||||
ch := watcher.ResultChan()
|
ch := watcher.ResultChan()
|
||||||
defer watcher.Stop()
|
defer watcher.Stop()
|
||||||
var after <-chan time.Time
|
var lastEvent *watch.Event
|
||||||
if timeout > 0 {
|
|
||||||
after = time.After(timeout)
|
|
||||||
} else {
|
|
||||||
ch := make(chan time.Time)
|
|
||||||
defer close(ch)
|
|
||||||
after = ch
|
|
||||||
}
|
|
||||||
var lastEvent *Event
|
|
||||||
for _, condition := range conditions {
|
for _, condition := range conditions {
|
||||||
// check the next condition against the previous event and short circuit waiting for the next watch
|
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||||
if lastEvent != nil {
|
if lastEvent != nil {
|
||||||
@ -69,7 +70,6 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
|
|||||||
}
|
}
|
||||||
lastEvent = &event
|
lastEvent = &event
|
||||||
|
|
||||||
// TODO: check for watch expired error and retry watch from latest point?
|
|
||||||
done, err := condition(event)
|
done, err := condition(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return lastEvent, err
|
return lastEvent, err
|
||||||
@ -78,10 +78,25 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
|
|||||||
break ConditionSucceeded
|
break ConditionSucceeded
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-after:
|
case <-ctx.Done():
|
||||||
return lastEvent, wait.ErrWaitTimeout
|
return lastEvent, wait.ErrWaitTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return lastEvent, nil
|
return lastEvent, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
|
||||||
|
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
|
||||||
|
if timeout < 0 {
|
||||||
|
// This should be handled in validation
|
||||||
|
glog.Errorf("Timeout for context shall not be negative!")
|
||||||
|
timeout = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeout == 0 {
|
||||||
|
return context.WithCancel(parent)
|
||||||
|
}
|
||||||
|
|
||||||
|
return context.WithTimeout(parent, timeout)
|
||||||
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package watch
|
package watch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -25,6 +26,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakePod struct {
|
type fakePod struct {
|
||||||
@ -35,26 +37,26 @@ func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjec
|
|||||||
func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") }
|
func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") }
|
||||||
|
|
||||||
func TestUntil(t *testing.T) {
|
func TestUntil(t *testing.T) {
|
||||||
fw := NewFake()
|
fw := watch.NewFake()
|
||||||
go func() {
|
go func() {
|
||||||
var obj *fakePod
|
var obj *fakePod
|
||||||
fw.Add(obj)
|
fw.Add(obj)
|
||||||
fw.Modify(obj)
|
fw.Modify(obj)
|
||||||
}()
|
}()
|
||||||
conditions := []ConditionFunc{
|
conditions := []ConditionFunc{
|
||||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||||
func(event Event) (bool, error) { return event.Type == Modified, nil },
|
func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil },
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := time.Minute
|
ctx, _ := context.WithTimeout(context.Background(), time.Minute)
|
||||||
lastEvent, err := Until(timeout, fw, conditions...)
|
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected nil error, got %#v", err)
|
t.Fatalf("expected nil error, got %#v", err)
|
||||||
}
|
}
|
||||||
if lastEvent == nil {
|
if lastEvent == nil {
|
||||||
t.Fatal("expected an event")
|
t.Fatal("expected an event")
|
||||||
}
|
}
|
||||||
if lastEvent.Type != Modified {
|
if lastEvent.Type != watch.Modified {
|
||||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||||
}
|
}
|
||||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||||
@ -63,25 +65,25 @@ func TestUntil(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUntilMultipleConditions(t *testing.T) {
|
func TestUntilMultipleConditions(t *testing.T) {
|
||||||
fw := NewFake()
|
fw := watch.NewFake()
|
||||||
go func() {
|
go func() {
|
||||||
var obj *fakePod
|
var obj *fakePod
|
||||||
fw.Add(obj)
|
fw.Add(obj)
|
||||||
}()
|
}()
|
||||||
conditions := []ConditionFunc{
|
conditions := []ConditionFunc{
|
||||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := time.Minute
|
ctx, _ := context.WithTimeout(context.Background(), time.Minute)
|
||||||
lastEvent, err := Until(timeout, fw, conditions...)
|
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected nil error, got %#v", err)
|
t.Fatalf("expected nil error, got %#v", err)
|
||||||
}
|
}
|
||||||
if lastEvent == nil {
|
if lastEvent == nil {
|
||||||
t.Fatal("expected an event")
|
t.Fatal("expected an event")
|
||||||
}
|
}
|
||||||
if lastEvent.Type != Added {
|
if lastEvent.Type != watch.Added {
|
||||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||||
}
|
}
|
||||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||||
@ -90,26 +92,26 @@ func TestUntilMultipleConditions(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUntilMultipleConditionsFail(t *testing.T) {
|
func TestUntilMultipleConditionsFail(t *testing.T) {
|
||||||
fw := NewFake()
|
fw := watch.NewFake()
|
||||||
go func() {
|
go func() {
|
||||||
var obj *fakePod
|
var obj *fakePod
|
||||||
fw.Add(obj)
|
fw.Add(obj)
|
||||||
}()
|
}()
|
||||||
conditions := []ConditionFunc{
|
conditions := []ConditionFunc{
|
||||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||||
func(event Event) (bool, error) { return event.Type == Deleted, nil },
|
func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil },
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := 10 * time.Second
|
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
lastEvent, err := Until(timeout, fw, conditions...)
|
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||||
if err != wait.ErrWaitTimeout {
|
if err != wait.ErrWaitTimeout {
|
||||||
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
|
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
|
||||||
}
|
}
|
||||||
if lastEvent == nil {
|
if lastEvent == nil {
|
||||||
t.Fatal("expected an event")
|
t.Fatal("expected an event")
|
||||||
}
|
}
|
||||||
if lastEvent.Type != Added {
|
if lastEvent.Type != watch.Added {
|
||||||
t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
|
t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
|
||||||
}
|
}
|
||||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||||
@ -118,30 +120,29 @@ func TestUntilMultipleConditionsFail(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUntilTimeout(t *testing.T) {
|
func TestUntilTimeout(t *testing.T) {
|
||||||
fw := NewFake()
|
fw := watch.NewFake()
|
||||||
go func() {
|
go func() {
|
||||||
var obj *fakePod
|
var obj *fakePod
|
||||||
fw.Add(obj)
|
fw.Add(obj)
|
||||||
fw.Modify(obj)
|
fw.Modify(obj)
|
||||||
}()
|
}()
|
||||||
conditions := []ConditionFunc{
|
conditions := []ConditionFunc{
|
||||||
func(event Event) (bool, error) {
|
func(event watch.Event) (bool, error) {
|
||||||
return event.Type == Added, nil
|
return event.Type == watch.Added, nil
|
||||||
},
|
},
|
||||||
func(event Event) (bool, error) {
|
func(event watch.Event) (bool, error) {
|
||||||
return event.Type == Modified, nil
|
return event.Type == watch.Modified, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := time.Duration(0)
|
lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...)
|
||||||
lastEvent, err := Until(timeout, fw, conditions...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected nil error, got %#v", err)
|
t.Fatalf("expected nil error, got %#v", err)
|
||||||
}
|
}
|
||||||
if lastEvent == nil {
|
if lastEvent == nil {
|
||||||
t.Fatal("expected an event")
|
t.Fatal("expected an event")
|
||||||
}
|
}
|
||||||
if lastEvent.Type != Modified {
|
if lastEvent.Type != watch.Modified {
|
||||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||||
}
|
}
|
||||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||||
@ -150,19 +151,20 @@ func TestUntilTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUntilErrorCondition(t *testing.T) {
|
func TestUntilErrorCondition(t *testing.T) {
|
||||||
fw := NewFake()
|
fw := watch.NewFake()
|
||||||
go func() {
|
go func() {
|
||||||
var obj *fakePod
|
var obj *fakePod
|
||||||
fw.Add(obj)
|
fw.Add(obj)
|
||||||
}()
|
}()
|
||||||
expected := "something bad"
|
expected := "something bad"
|
||||||
conditions := []ConditionFunc{
|
conditions := []ConditionFunc{
|
||||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||||
func(event Event) (bool, error) { return false, errors.New(expected) },
|
func(event watch.Event) (bool, error) { return false, errors.New(expected) },
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := time.Minute
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||||
_, err := Until(timeout, fw, conditions...)
|
defer cancel()
|
||||||
|
_, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatal("expected an error")
|
t.Fatal("expected an error")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user