From ead12b02fed9e8c3a8ebf693f71fa452115d923b Mon Sep 17 00:00:00 2001 From: wojtekt Date: Thu, 7 May 2020 20:21:46 +0200 Subject: [PATCH] Remove ListWatchUntil --- pkg/client/tests/BUILD | 2 - pkg/client/tests/listwatch_test.go | 67 ------------------ .../src/k8s.io/client-go/tools/watch/BUILD | 1 - .../src/k8s.io/client-go/tools/watch/until.go | 69 ------------------- test/e2e/apps/statefulset.go | 19 +++-- 5 files changed, 13 insertions(+), 145 deletions(-) diff --git a/pkg/client/tests/BUILD b/pkg/client/tests/BUILD index 4c3982b2864..1d997a32e0f 100644 --- a/pkg/client/tests/BUILD +++ b/pkg/client/tests/BUILD @@ -29,14 +29,12 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/portforward:go_default_library", "//staging/src/k8s.io/client-go/tools/remotecommand:go_default_library", - "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//staging/src/k8s.io/client-go/transport/spdy:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", diff --git a/pkg/client/tests/listwatch_test.go b/pkg/client/tests/listwatch_test.go index 40c1b9f8c4d..3be80bef786 100644 --- a/pkg/client/tests/listwatch_test.go +++ b/pkg/client/tests/listwatch_test.go @@ -17,22 +17,16 @@ limitations under the License. package tests import ( - "context" "net/http/httptest" "net/url" "testing" - "time" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" . "k8s.io/client-go/tools/cache" - watchtools "k8s.io/client-go/tools/watch" utiltesting "k8s.io/client-go/util/testing" ) @@ -176,64 +170,3 @@ func TestListWatchesCanWatch(t *testing.T) { handler.ValidateRequest(t, item.location, "GET", nil) } } - -type lw struct { - list runtime.Object - watch watch.Interface -} - -func (w lw) List(options metav1.ListOptions) (runtime.Object, error) { - return w.list, nil -} - -func (w lw) Watch(options metav1.ListOptions) (watch.Interface, error) { - return w.watch, nil -} - -func TestListWatchUntil(t *testing.T) { - fw := watch.NewFake() - go func() { - obj := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "2", - }, - } - fw.Modify(obj) - }() - listwatch := lw{ - list: &v1.PodList{ - ListMeta: metav1.ListMeta{ - ResourceVersion: "1", - }, - Items: []v1.Pod{{}}, - }, - watch: fw, - } - - conditions := []watchtools.ConditionFunc{ - func(event watch.Event) (bool, error) { - t.Logf("got %#v", event) - return event.Type == watch.Added, nil - }, - func(event watch.Event) (bool, error) { - t.Logf("got %#v", event) - return event.Type == watch.Modified, nil - }, - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - lastEvent, err := watchtools.ListWatchUntil(ctx, listwatch, conditions...) - if err != nil { - t.Fatalf("expected nil error, got %#v", err) - } - if lastEvent == nil { - t.Fatal("expected an event") - } - if lastEvent.Type != watch.Modified { - t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) - } - if got, isPod := lastEvent.Object.(*v1.Pod); !isPod { - t.Fatalf("expected a pod event, got %#v", got) - } -} diff --git a/staging/src/k8s.io/client-go/tools/watch/BUILD b/staging/src/k8s.io/client-go/tools/watch/BUILD index f31c836986e..09b26b47d1c 100644 --- a/staging/src/k8s.io/client-go/tools/watch/BUILD +++ b/staging/src/k8s.io/client-go/tools/watch/BUILD @@ -12,7 +12,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index e12d82aca48..37a46da49e6 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -22,8 +22,6 @@ import ( "fmt" "time" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -167,70 +165,3 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) ( return context.WithTimeout(parent, timeout) } - -// ListWatchUntil first lists objects, converts them into synthetic ADDED events -// and checks conditions for those synthetic events. If the conditions have not been reached so far -// it continues by calling Until which establishes a watch from resourceVersion of the list call -// to evaluate those conditions based on new events. -// ListWatchUntil provides the same guarantees as Until and replaces the old WATCH from RV "" (or "0") -// which was mixing list and watch calls internally and having severe design issues. (see #74022) -// There is no resourceVersion order guarantee for the initial list and those synthetic events. -func ListWatchUntil(ctx context.Context, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) { - if len(conditions) == 0 { - return nil, nil - } - - list, err := lw.List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - initialItems, err := meta.ExtractList(list) - if err != nil { - return nil, err - } - - // use the initial items as simulated "adds" - var lastEvent *watch.Event - currIndex := 0 - passedConditions := 0 - for _, condition := range conditions { - // check the next condition against the previous event and short circuit waiting for the next watch - if lastEvent != nil { - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - passedConditions = passedConditions + 1 - continue - } - } - - ConditionSucceeded: - for currIndex < len(initialItems) { - lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]} - currIndex++ - - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - passedConditions = passedConditions + 1 - break ConditionSucceeded - } - } - } - if passedConditions == len(conditions) { - return lastEvent, nil - } - remainingConditions := conditions[passedConditions:] - - metaObj, err := meta.ListAccessor(list) - if err != nil { - return nil, err - } - currResourceVersion := metaObj.GetResourceVersion() - - return Until(ctx, currResourceVersion, lw, remainingConditions...) -} diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index b5e938350bb..bf7cd5226d9 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -30,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" klabels "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" @@ -748,12 +747,20 @@ var _ = SIGDescribe("StatefulSet", func() { var initialStatefulPodUID types.UID ginkgo.By("Waiting until stateful pod " + statefulPodName + " will be recreated and deleted at least once in namespace " + f.Namespace.Name) + fieldSelector := fields.OneTermEqualSelector("metadata.name", statefulPodName).String() + pl, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{ + FieldSelector: fieldSelector, + }) + framework.ExpectNoError(err) + if len(pl.Items) > 0 { + pod := pl.Items[0] + framework.Logf("Observed stateful pod in namespace: %v, name: %v, uid: %v, status phase: %v. Waiting for statefulset controller to delete.", + pod.Namespace, pod.Name, pod.UID, pod.Status.Phase) + initialStatefulPodUID = pod.UID + } + lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { - options.FieldSelector = fieldSelector - return f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), options) - }, WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { options.FieldSelector = fieldSelector return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(context.TODO(), options) @@ -762,7 +769,7 @@ var _ = SIGDescribe("StatefulSet", func() { ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulPodTimeout) defer cancel() // we need to get UID from pod in any state and wait until stateful set controller will remove pod at least once - _, err = watchtools.ListWatchUntil(ctx, lw, func(event watch.Event) (bool, error) { + _, err = watchtools.Until(ctx, pl.ResourceVersion, lw, func(event watch.Event) (bool, error) { pod := event.Object.(*v1.Pod) switch event.Type { case watch.Deleted: