From 603dd254ac91b3581f9b431ff91a95d929a97e04 Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Mon, 18 Feb 2019 18:20:23 +0100 Subject: [PATCH] Upgrade ListWatchUntil --- pkg/client/tests/listwatch_test.go | 19 +++++++++--- .../src/k8s.io/client-go/tools/watch/until.go | 29 ++++++------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pkg/client/tests/listwatch_test.go b/pkg/client/tests/listwatch_test.go index 35a8a256e44..1adaf67adbf 100644 --- a/pkg/client/tests/listwatch_test.go +++ b/pkg/client/tests/listwatch_test.go @@ -17,6 +17,7 @@ limitations under the License. package tests import ( + "context" "net/http/httptest" "net/url" "testing" @@ -194,11 +195,20 @@ func (w lw) Watch(options metav1.ListOptions) (watch.Interface, error) { func TestListWatchUntil(t *testing.T) { fw := watch.NewFake() go func() { - var obj *v1.Pod + obj := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + }, + } fw.Modify(obj) }() listwatch := lw{ - list: &v1.PodList{Items: []v1.Pod{{}}}, + list: &v1.PodList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "1", + }, + Items: []v1.Pod{{}}, + }, watch: fw, } @@ -213,8 +223,9 @@ func TestListWatchUntil(t *testing.T) { }, } - timeout := 10 * time.Second - lastEvent, err := watchtools.ListWatchUntil(timeout, listwatch, conditions...) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + lastEvent, err := watchtools.ListWatchUntil(ctx, listwatch, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index 0a341a444fc..e12d82aca48 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -168,13 +168,14 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) ( return context.WithTimeout(parent, timeout) } -// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout -// if timeout is exceeded without all conditions returning true, or an error if an error occurs. -// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -// TODO: remove when no longer used -// -// Deprecated: Use UntilWithSync instead. -func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) { +// ListWatchUntil first lists objects, converts them into synthetic ADDED events +// and checks conditions for those synthetic events. If the conditions have not been reached so far +// it continues by calling Until which establishes a watch from resourceVersion of the list call +// to evaluate those conditions based on new events. +// ListWatchUntil provides the same guarantees as Until and replaces the old WATCH from RV "" (or "0") +// which was mixing list and watch calls internally and having severe design issues. (see #74022) +// There is no resourceVersion order guarantee for the initial list and those synthetic events. +func ListWatchUntil(ctx context.Context, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) { if len(conditions) == 0 { return nil, nil } @@ -231,17 +232,5 @@ func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions .. } currResourceVersion := metaObj.GetResourceVersion() - watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion}) - if err != nil { - return nil, err - } - - ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() - evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...) - if err == ErrWatchClosed { - // present a consistent error interface to callers - err = wait.ErrWaitTimeout - } - return evt, err + return Until(ctx, currResourceVersion, lw, remainingConditions...) }