Remove ListWatchUntil

This commit is contained in:
wojtekt 2020-05-07 20:21:46 +02:00
parent af67408c17
commit ead12b02fe
5 changed files with 13 additions and 145 deletions

View File

@ -29,14 +29,12 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/httpstream:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/portforward:go_default_library",
"//staging/src/k8s.io/client-go/tools/remotecommand:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//staging/src/k8s.io/client-go/transport/spdy:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",

View File

@ -17,22 +17,16 @@ limitations under the License.
package tests
import (
"context"
"net/http/httptest"
"net/url"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
. "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
utiltesting "k8s.io/client-go/util/testing"
)
@ -176,64 +170,3 @@ func TestListWatchesCanWatch(t *testing.T) {
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
type lw struct {
list runtime.Object
watch watch.Interface
}
func (w lw) List(options metav1.ListOptions) (runtime.Object, error) {
return w.list, nil
}
func (w lw) Watch(options metav1.ListOptions) (watch.Interface, error) {
return w.watch, nil
}
func TestListWatchUntil(t *testing.T) {
fw := watch.NewFake()
go func() {
obj := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
}
fw.Modify(obj)
}()
listwatch := lw{
list: &v1.PodList{
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: []v1.Pod{{}},
},
watch: fw,
}
conditions := []watchtools.ConditionFunc{
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Added, nil
},
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Modified, nil
},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
lastEvent, err := watchtools.ListWatchUntil(ctx, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != watch.Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*v1.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}

View File

@ -12,7 +12,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",

View File

@ -22,8 +22,6 @@ import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -167,70 +165,3 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (
return context.WithTimeout(parent, timeout)
}
// ListWatchUntil first lists objects, converts them into synthetic ADDED events
// and checks conditions for those synthetic events. If the conditions have not been reached so far
// it continues by calling Until which establishes a watch from resourceVersion of the list call
// to evaluate those conditions based on new events.
// ListWatchUntil provides the same guarantees as Until and replaces the old WATCH from RV "" (or "0")
// which was mixing list and watch calls internally and having severe design issues. (see #74022)
// There is no resourceVersion order guarantee for the initial list and those synthetic events.
func ListWatchUntil(ctx context.Context, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *watch.Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
return Until(ctx, currResourceVersion, lw, remainingConditions...)
}

View File

@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
@ -748,12 +747,20 @@ var _ = SIGDescribe("StatefulSet", func() {
var initialStatefulPodUID types.UID
ginkgo.By("Waiting until stateful pod " + statefulPodName + " will be recreated and deleted at least once in namespace " + f.Namespace.Name)
fieldSelector := fields.OneTermEqualSelector("metadata.name", statefulPodName).String()
pl, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
FieldSelector: fieldSelector,
})
framework.ExpectNoError(err)
if len(pl.Items) > 0 {
pod := pl.Items[0]
framework.Logf("Observed stateful pod in namespace: %v, name: %v, uid: %v, status phase: %v. Waiting for statefulset controller to delete.",
pod.Namespace, pod.Name, pod.UID, pod.Status.Phase)
initialStatefulPodUID = pod.UID
}
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
options.FieldSelector = fieldSelector
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(context.TODO(), options)
@ -762,7 +769,7 @@ var _ = SIGDescribe("StatefulSet", func() {
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulPodTimeout)
defer cancel()
// we need to get UID from pod in any state and wait until stateful set controller will remove pod at least once
_, err = watchtools.ListWatchUntil(ctx, lw, func(event watch.Event) (bool, error) {
_, err = watchtools.Until(ctx, pl.ResourceVersion, lw, func(event watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
switch event.Type {
case watch.Deleted: