fix watch.until and added listwatch

This commit is contained in:
deads2k
2016-10-04 09:59:13 -04:00
parent 617fa91264
commit c2ed560991
2 changed files with 160 additions and 1 deletions

View File

@@ -19,6 +19,9 @@ package watch
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
@@ -53,7 +56,7 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
return lastEvent, err
}
if done {
break
continue
}
}
ConditionSucceeded:
@@ -81,3 +84,78 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
}
return lastEvent, nil
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (Interface, error)
}
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...ConditionFunc) (*Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(api.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &Event{Type: Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
watch, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}
return Until(timeout, watch, remainingConditions...)
}

View File

@@ -23,6 +23,8 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestUntil(t *testing.T) {
@@ -80,6 +82,34 @@ func TestUntilMultipleConditions(t *testing.T) {
}
}
func TestUntilMultipleConditionsFail(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
}()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return event.Type == Deleted, nil },
}
timeout := 10 * time.Second
lastEvent, err := Until(timeout, fw, conditions...)
if err != wait.ErrWaitTimeout {
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Added {
t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilTimeout(t *testing.T) {
fw := NewFake()
go func() {
@@ -133,3 +163,54 @@ func TestUntilErrorCondition(t *testing.T) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}
type lw struct {
list runtime.Object
watch Interface
}
func (w lw) List(options api.ListOptions) (runtime.Object, error) {
return w.list, nil
}
func (w lw) Watch(options api.ListOptions) (Interface, error) {
return w.watch, nil
}
func TestListWatchUntil(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Modify(obj)
}()
listwatch := lw{
list: &api.PodList{Items: []api.Pod{{}}},
watch: fw,
}
conditions := []ConditionFunc{
func(event Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == Added, nil
},
func(event Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == Modified, nil
},
}
timeout := 10 * time.Second
lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}