From cbdb98d74d1e411a1e2277864fc90a1131252c93 Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Thu, 2 Aug 2018 14:11:59 +0200 Subject: [PATCH] Rename Until to UntilWithoutRetry and move to using context so it's cancelable Kubernetes-commit: 3d4a02abb54244861f9f05b8db2fdfdaa2c6f67c --- tools/cache/listwatch.go | 9 ++++-- tools/watch/until.go | 49 +++++++++++++++++++---------- tools/watch/until_test.go | 66 ++++++++++++++++++++------------------- 3 files changed, 72 insertions(+), 52 deletions(-) diff --git a/tools/cache/listwatch.go b/tools/cache/listwatch.go index 8bf41f51..30463aea 100644 --- a/tools/cache/listwatch.go +++ b/tools/cache/listwatch.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/pager" + watchtools "k8s.io/client-go/tools/watch" ) // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. @@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) // ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout // if timeout is exceeded without all conditions returning true, or an error if an error occurs. // TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) { +func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { if len(conditions) == 0 { return nil, nil } @@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch return nil, err } - evt, err := watch.Until(timeout, watchInterface, remainingConditions...) - if err == watch.ErrWatchClosed { + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...) + if err == watchtools.ErrWatchClosed { // present a consistent error interface to callers err = wait.ErrWaitTimeout } diff --git a/tools/watch/until.go b/tools/watch/until.go index c2772ddb..4a891b23 100644 --- a/tools/watch/until.go +++ b/tools/watch/until.go @@ -17,38 +17,39 @@ limitations under the License. package watch import ( + "context" "errors" "time" + "github.com/golang/glog" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" ) // ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, // or an error if the condition cannot be checked and should terminate. In general, it is better to define // level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed // from false to true). -type ConditionFunc func(event Event) (bool, error) +type ConditionFunc func(event watch.Event) (bool, error) -// ErrWatchClosed is returned when the watch channel is closed before timeout in Until. -var ErrWatchClosed = errors.New("watch closed before Until timeout") +// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry. +var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout") -// Until reads items from the watch until each provided condition succeeds, and then returns the last watch +// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch // encountered. The first condition that returns an error terminates the watch (and the event is also returned). // If no event has been received, the returned event will be nil. // Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. -// A zero timeout means to wait forever. -func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) { +// Waits until context deadline or until context is canceled. +// +// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!! +// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error. +// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below, +// Warning: solving such issues. +// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone. +func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) { ch := watcher.ResultChan() defer watcher.Stop() - var after <-chan time.Time - if timeout > 0 { - after = time.After(timeout) - } else { - ch := make(chan time.Time) - defer close(ch) - after = ch - } - var lastEvent *Event + var lastEvent *watch.Event for _, condition := range conditions { // check the next condition against the previous event and short circuit waiting for the next watch if lastEvent != nil { @@ -69,7 +70,6 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc } lastEvent = &event - // TODO: check for watch expired error and retry watch from latest point? done, err := condition(event) if err != nil { return lastEvent, err @@ -78,10 +78,25 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc break ConditionSucceeded } - case <-after: + case <-ctx.Done(): return lastEvent, wait.ErrWaitTimeout } } } return lastEvent, nil } + +// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. +func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout < 0 { + // This should be handled in validation + glog.Errorf("Timeout for context shall not be negative!") + timeout = 0 + } + + if timeout == 0 { + return context.WithCancel(parent) + } + + return context.WithTimeout(parent, timeout) +} diff --git a/tools/watch/until_test.go b/tools/watch/until_test.go index e872c368..e766acd7 100644 --- a/tools/watch/until_test.go +++ b/tools/watch/until_test.go @@ -17,6 +17,7 @@ limitations under the License. package watch import ( + "context" "errors" "strings" "testing" @@ -25,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" ) type fakePod struct { @@ -35,26 +37,26 @@ func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjec func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") } func TestUntil(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) fw.Modify(obj) }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return event.Type == Modified, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil }, } - timeout := time.Minute - lastEvent, err := Until(timeout, fw, conditions...) + ctx, _ := context.WithTimeout(context.Background(), time.Minute) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } if lastEvent == nil { t.Fatal("expected an event") } - if lastEvent.Type != Modified { + if lastEvent.Type != watch.Modified { t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) } if got, isPod := lastEvent.Object.(*fakePod); !isPod { @@ -63,25 +65,25 @@ func TestUntil(t *testing.T) { } func TestUntilMultipleConditions(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, } - timeout := time.Minute - lastEvent, err := Until(timeout, fw, conditions...) + ctx, _ := context.WithTimeout(context.Background(), time.Minute) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } if lastEvent == nil { t.Fatal("expected an event") } - if lastEvent.Type != Added { + if lastEvent.Type != watch.Added { t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) } if got, isPod := lastEvent.Object.(*fakePod); !isPod { @@ -90,26 +92,26 @@ func TestUntilMultipleConditions(t *testing.T) { } func TestUntilMultipleConditionsFail(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return event.Type == Deleted, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil }, } - timeout := 10 * time.Second - lastEvent, err := Until(timeout, fw, conditions...) + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) if err != wait.ErrWaitTimeout { t.Fatalf("expected ErrWaitTimeout error, got %#v", err) } if lastEvent == nil { t.Fatal("expected an event") } - if lastEvent.Type != Added { + if lastEvent.Type != watch.Added { t.Fatalf("expected ADDED event type, got %v", lastEvent.Type) } if got, isPod := lastEvent.Object.(*fakePod); !isPod { @@ -118,30 +120,29 @@ func TestUntilMultipleConditionsFail(t *testing.T) { } func TestUntilTimeout(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) fw.Modify(obj) }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { - return event.Type == Added, nil + func(event watch.Event) (bool, error) { + return event.Type == watch.Added, nil }, - func(event Event) (bool, error) { - return event.Type == Modified, nil + func(event watch.Event) (bool, error) { + return event.Type == watch.Modified, nil }, } - timeout := time.Duration(0) - lastEvent, err := Until(timeout, fw, conditions...) + lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } if lastEvent == nil { t.Fatal("expected an event") } - if lastEvent.Type != Modified { + if lastEvent.Type != watch.Modified { t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) } if got, isPod := lastEvent.Object.(*fakePod); !isPod { @@ -150,19 +151,20 @@ func TestUntilTimeout(t *testing.T) { } func TestUntilErrorCondition(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) }() expected := "something bad" conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return false, errors.New(expected) }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return false, errors.New(expected) }, } - timeout := time.Minute - _, err := Until(timeout, fw, conditions...) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + _, err := UntilWithoutRetry(ctx, fw, conditions...) if err == nil { t.Fatal("expected an error") }