diff --git a/pkg/watch/until.go b/pkg/watch/until.go index da4ebc49eef..5624d50b1a4 100644 --- a/pkg/watch/until.go +++ b/pkg/watch/until.go @@ -19,6 +19,9 @@ package watch import ( "time" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/wait" ) @@ -53,7 +56,7 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc return lastEvent, err } if done { - break + continue } } ConditionSucceeded: @@ -81,3 +84,78 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc } return lastEvent, nil } + +// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. +type ListerWatcher interface { + // List should return a list type object; the Items field will be extracted, and the + // ResourceVersion field will be used to start the watch in the right place. + List(options api.ListOptions) (runtime.Object, error) + // Watch should begin a watch at the specified version. + Watch(options api.ListOptions) (Interface, error) +} + +// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. +func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...ConditionFunc) (*Event, error) { + if len(conditions) == 0 { + return nil, nil + } + + list, err := lw.List(api.ListOptions{}) + if err != nil { + return nil, err + } + initialItems, err := meta.ExtractList(list) + if err != nil { + return nil, err + } + + // use the initial items as simulated "adds" + var lastEvent *Event + currIndex := 0 + passedConditions := 0 + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + continue + } + } + + ConditionSucceeded: + for currIndex < len(initialItems) { + lastEvent = &Event{Type: Added, Object: initialItems[currIndex]} + currIndex++ + + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + break ConditionSucceeded + } + } + } + if passedConditions == len(conditions) { + return lastEvent, nil + } + remainingConditions := conditions[passedConditions:] + + metaObj, err := meta.ListAccessor(list) + if err != nil { + return nil, err + } + currResourceVersion := metaObj.GetResourceVersion() + + watch, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion}) + if err != nil { + return nil, err + } + + return Until(timeout, watch, remainingConditions...) +} diff --git a/pkg/watch/until_test.go b/pkg/watch/until_test.go index 1e72f70cc3c..fdb2e4920f3 100644 --- a/pkg/watch/until_test.go +++ b/pkg/watch/until_test.go @@ -23,6 +23,8 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) func TestUntil(t *testing.T) { @@ -80,6 +82,34 @@ func TestUntilMultipleConditions(t *testing.T) { } } +func TestUntilMultipleConditionsFail(t *testing.T) { + fw := NewFake() + go func() { + var obj *api.Pod + fw.Add(obj) + }() + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { return event.Type == Deleted, nil }, + } + + timeout := 10 * time.Second + lastEvent, err := Until(timeout, fw, conditions...) + if err != wait.ErrWaitTimeout { + t.Fatalf("expected ErrWaitTimeout error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != Added { + t.Fatalf("expected ADDED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*api.Pod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + func TestUntilTimeout(t *testing.T) { fw := NewFake() go func() { @@ -133,3 +163,54 @@ func TestUntilErrorCondition(t *testing.T) { t.Fatalf("expected %q in error string, got %q", expected, err.Error()) } } + +type lw struct { + list runtime.Object + watch Interface +} + +func (w lw) List(options api.ListOptions) (runtime.Object, error) { + return w.list, nil +} + +func (w lw) Watch(options api.ListOptions) (Interface, error) { + return w.watch, nil +} + +func TestListWatchUntil(t *testing.T) { + fw := NewFake() + go func() { + var obj *api.Pod + fw.Modify(obj) + }() + listwatch := lw{ + list: &api.PodList{Items: []api.Pod{{}}}, + watch: fw, + } + + conditions := []ConditionFunc{ + func(event Event) (bool, error) { + t.Logf("got %#v", event) + return event.Type == Added, nil + }, + func(event Event) (bool, error) { + t.Logf("got %#v", event) + return event.Type == Modified, nil + }, + } + + timeout := 10 * time.Second + lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != Modified { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*api.Pod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +}