diff --git a/pkg/client/cache/listwatch.go b/pkg/client/cache/listwatch.go index ff56c0b7bf4..3956ccb1e25 100644 --- a/pkg/client/cache/listwatch.go +++ b/pkg/client/cache/listwatch.go @@ -20,12 +20,22 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" ) +// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. +type ListerWatcher interface { + // List should return a list type object; the Items field will be extracted, and the + // ResourceVersion field will be used to start the watch in the right place. + List(options api.ListOptions) (runtime.Object, error) + // Watch should begin a watch at the specified version. + Watch(options api.ListOptions) (watch.Interface, error) +} + // ListFunc knows how to list resources type ListFunc func(options api.ListOptions) (runtime.Object, error) @@ -84,3 +94,69 @@ func (lw *ListWatch) List(options api.ListOptions) (runtime.Object, error) { func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) { return lw.WatchFunc(options) } + +// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. +func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) { + if len(conditions) == 0 { + return nil, nil + } + + list, err := lw.List(api.ListOptions{}) + if err != nil { + return nil, err + } + initialItems, err := meta.ExtractList(list) + if err != nil { + return nil, err + } + + // use the initial items as simulated "adds" + var lastEvent *watch.Event + currIndex := 0 + passedConditions := 0 + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + continue + } + } + + ConditionSucceeded: + for currIndex < len(initialItems) { + lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]} + currIndex++ + + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + break ConditionSucceeded + } + } + } + if passedConditions == len(conditions) { + return lastEvent, nil + } + remainingConditions := conditions[passedConditions:] + + metaObj, err := meta.ListAccessor(list) + if err != nil { + return nil, err + } + currResourceVersion := metaObj.GetResourceVersion() + + watchInterface, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion}) + if err != nil { + return nil, err + } + + return watch.Until(timeout, watchInterface, remainingConditions...) +} diff --git a/pkg/client/cache/listwatch_test.go b/pkg/client/cache/listwatch_test.go index b141ecd67a4..1766bcf43eb 100644 --- a/pkg/client/cache/listwatch_test.go +++ b/pkg/client/cache/listwatch_test.go @@ -20,6 +20,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" @@ -28,7 +29,9 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/runtime" utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/watch" ) func parseSelectorOrDie(s string) fields.Selector { @@ -171,3 +174,54 @@ func TestListWatchesCanWatch(t *testing.T) { handler.ValidateRequest(t, item.location, "GET", nil) } } + +type lw struct { + list runtime.Object + watch watch.Interface +} + +func (w lw) List(options api.ListOptions) (runtime.Object, error) { + return w.list, nil +} + +func (w lw) Watch(options api.ListOptions) (watch.Interface, error) { + return w.watch, nil +} + +func TestListWatchUntil(t *testing.T) { + fw := watch.NewFake() + go func() { + var obj *api.Pod + fw.Modify(obj) + }() + listwatch := lw{ + list: &api.PodList{Items: []api.Pod{{}}}, + watch: fw, + } + + conditions := []watch.ConditionFunc{ + func(event watch.Event) (bool, error) { + t.Logf("got %#v", event) + return event.Type == watch.Added, nil + }, + func(event watch.Event) (bool, error) { + t.Logf("got %#v", event) + return event.Type == watch.Modified, nil + }, + } + + timeout := 10 * time.Second + lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != watch.Modified { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*api.Pod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index fee51a11068..745ee76240f 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -43,15 +43,6 @@ import ( "k8s.io/kubernetes/pkg/watch" ) -// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. -type ListerWatcher interface { - // List should return a list type object; the Items field will be extracted, and the - // ResourceVersion field will be used to start the watch in the right place. - List(options api.ListOptions) (runtime.Object, error) - // Watch should begin a watch at the specified version. - Watch(options api.ListOptions) (watch.Interface, error) -} - // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // name identifies this reflector. By default it will be a file:line if possible. diff --git a/pkg/controller/client_builder.go b/pkg/controller/client_builder.go index d5e1aca3737..88f9c97c1ed 100644 --- a/pkg/controller/client_builder.go +++ b/pkg/controller/client_builder.go @@ -119,7 +119,7 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro return b.CoreClient.Secrets(b.Namespace).Watch(options) }, } - _, err = watch.ListWatchUntil(30*time.Second, lw, + _, err = cache.ListWatchUntil(30*time.Second, lw, func(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: diff --git a/pkg/watch/BUILD b/pkg/watch/BUILD index a30de1a75d7..8a8a2503274 100644 --- a/pkg/watch/BUILD +++ b/pkg/watch/BUILD @@ -22,8 +22,6 @@ go_library( ], tags = ["automanaged"], deps = [ - "//pkg/api:go_default_library", - "//pkg/api/meta:go_default_library", "//pkg/api/unversioned:go_default_library", "//pkg/runtime:go_default_library", "//pkg/util/net:go_default_library", diff --git a/pkg/watch/until.go b/pkg/watch/until.go index 5624d50b1a4..17970b021d9 100644 --- a/pkg/watch/until.go +++ b/pkg/watch/until.go @@ -19,9 +19,6 @@ package watch import ( "time" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/meta" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/wait" ) @@ -84,78 +81,3 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc } return lastEvent, nil } - -// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. -type ListerWatcher interface { - // List should return a list type object; the Items field will be extracted, and the - // ResourceVersion field will be used to start the watch in the right place. - List(options api.ListOptions) (runtime.Object, error) - // Watch should begin a watch at the specified version. - Watch(options api.ListOptions) (Interface, error) -} - -// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...ConditionFunc) (*Event, error) { - if len(conditions) == 0 { - return nil, nil - } - - list, err := lw.List(api.ListOptions{}) - if err != nil { - return nil, err - } - initialItems, err := meta.ExtractList(list) - if err != nil { - return nil, err - } - - // use the initial items as simulated "adds" - var lastEvent *Event - currIndex := 0 - passedConditions := 0 - for _, condition := range conditions { - // check the next condition against the previous event and short circuit waiting for the next watch - if lastEvent != nil { - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - passedConditions = passedConditions + 1 - continue - } - } - - ConditionSucceeded: - for currIndex < len(initialItems) { - lastEvent = &Event{Type: Added, Object: initialItems[currIndex]} - currIndex++ - - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - passedConditions = passedConditions + 1 - break ConditionSucceeded - } - } - } - if passedConditions == len(conditions) { - return lastEvent, nil - } - remainingConditions := conditions[passedConditions:] - - metaObj, err := meta.ListAccessor(list) - if err != nil { - return nil, err - } - currResourceVersion := metaObj.GetResourceVersion() - - watch, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion}) - if err != nil { - return nil, err - } - - return Until(timeout, watch, remainingConditions...) -} diff --git a/pkg/watch/until_test.go b/pkg/watch/until_test.go index fdb2e4920f3..4f6ec981ff8 100644 --- a/pkg/watch/until_test.go +++ b/pkg/watch/until_test.go @@ -23,7 +23,6 @@ import ( "time" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/wait" ) @@ -163,54 +162,3 @@ func TestUntilErrorCondition(t *testing.T) { t.Fatalf("expected %q in error string, got %q", expected, err.Error()) } } - -type lw struct { - list runtime.Object - watch Interface -} - -func (w lw) List(options api.ListOptions) (runtime.Object, error) { - return w.list, nil -} - -func (w lw) Watch(options api.ListOptions) (Interface, error) { - return w.watch, nil -} - -func TestListWatchUntil(t *testing.T) { - fw := NewFake() - go func() { - var obj *api.Pod - fw.Modify(obj) - }() - listwatch := lw{ - list: &api.PodList{Items: []api.Pod{{}}}, - watch: fw, - } - - conditions := []ConditionFunc{ - func(event Event) (bool, error) { - t.Logf("got %#v", event) - return event.Type == Added, nil - }, - func(event Event) (bool, error) { - t.Logf("got %#v", event) - return event.Type == Modified, nil - }, - } - - timeout := 10 * time.Second - lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...) - if err != nil { - t.Fatalf("expected nil error, got %#v", err) - } - if lastEvent == nil { - t.Fatal("expected an event") - } - if lastEvent.Type != Modified { - t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) - } - if got, isPod := lastEvent.Object.(*api.Pod); !isPod { - t.Fatalf("expected a pod event, got %#v", got) - } -} diff --git a/test/test_owners.csv b/test/test_owners.csv index cb2a50e2aa4..abaa63625b1 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -282,13 +282,13 @@ Opaque resources should account opaque integer resources in pods with multiple c Opaque resources should not break pods that do not consume opaque integer resources.,ConnorDoyle,0 Opaque resources should not schedule pods that exceed the available amount of opaque integer resource.,ConnorDoyle,0 Opaque resources should schedule pods that do consume opaque integer resources.,ConnorDoyle,0 -PersistentVolumes with Single PV - PVC pairs create a PV and a pre-bound PVC: test write access,roberthbailey,1 -PersistentVolumes with Single PV - PVC pairs create a PVC and a pre-bound PV: test write access,piosz,1 -PersistentVolumes with Single PV - PVC pairs create a PVC and non-pre-bound PV: test write access,sttts,1 -PersistentVolumes with Single PV - PVC pairs should create a non-pre-bound PV and PVC: test write access,yujuhong,1 -PersistentVolumes with multiple PVs and PVCs all in same ns should create 2 PVs and 4 PVCs: test write access,brendandburns,1 -PersistentVolumes with multiple PVs and PVCs all in same ns should create 3 PVs and 3 PVCs: test write access,hurf,1 -PersistentVolumes with multiple PVs and PVCs all in same ns should create 4 PVs and 2 PVCs: test write access,jlowdermilk,1 +PersistentVolumes with Single PV - PVC pairs create a PV and a pre-bound PVC: test write access,caesarxuchao,1 +PersistentVolumes with Single PV - PVC pairs create a PVC and a pre-bound PV: test write access,caesarxuchao,1 +PersistentVolumes with Single PV - PVC pairs create a PVC and non-pre-bound PV: test write access,caesarxuchao,1 +PersistentVolumes with Single PV - PVC pairs should create a non-pre-bound PV and PVC: test write access,caesarxuchao,1 +PersistentVolumes with multiple PVs and PVCs all in same ns should create 2 PVs and 4 PVCs: test write access,caesarxuchao,1 +PersistentVolumes with multiple PVs and PVCs all in same ns should create 3 PVs and 3 PVCs: test write access,caesarxuchao,1 +PersistentVolumes with multiple PVs and PVCs all in same ns should create 4 PVs and 2 PVCs: test write access,caesarxuchao,1 Pet Store should scale to persist a nominal number ( * ) of transactions in * seconds,xiang90,1 Pet set recreate should recreate evicted statefulset,roberthbailey,1 "Pod Disks Should schedule a pod w/ a RW PD, gracefully remove it, then schedule it on another host",alex-mohr,1 @@ -453,10 +453,7 @@ k8s.io/kubernetes/cmd/kube-apiserver/app,nikhiljindal,0 k8s.io/kubernetes/cmd/kube-apiserver/app/options,nikhiljindal,0 k8s.io/kubernetes/cmd/kube-discovery/app,pmorie,1 k8s.io/kubernetes/cmd/kube-proxy/app,luxas,1 -k8s.io/kubernetes/cmd/kubeadm/app/cmd,davidopp,1 -k8s.io/kubernetes/cmd/kubeadm/app/images,saad-ali,1 -k8s.io/kubernetes/cmd/kubeadm/app/util,eparis,1 -k8s.io/kubernetes/cmd/kubeadm/app/cmd,vishh,1 +k8s.io/kubernetes/cmd/kubeadm/app/cmd,caesarxuchao,1 k8s.io/kubernetes/cmd/kubeadm/app/images,davidopp,1 k8s.io/kubernetes/cmd/kubeadm/app/preflight,apprenda,0 k8s.io/kubernetes/cmd/kubeadm/app/util,krousey,1 @@ -631,9 +628,9 @@ k8s.io/kubernetes/pkg/kubelet/qos,vishh,0 k8s.io/kubernetes/pkg/kubelet/rkt,apelisse,1 k8s.io/kubernetes/pkg/kubelet/rktshim,mml,1 k8s.io/kubernetes/pkg/kubelet/server,timstclair,0 -k8s.io/kubernetes/pkg/kubelet/server/portforward,saad-ali,1 +k8s.io/kubernetes/pkg/kubelet/server/portforward,caesarxuchao,1 k8s.io/kubernetes/pkg/kubelet/server/stats,timstclair,0 -k8s.io/kubernetes/pkg/kubelet/server/streaming,derekwaynecarr,1 +k8s.io/kubernetes/pkg/kubelet/server/streaming,caesarxuchao,1 k8s.io/kubernetes/pkg/kubelet/status,mwielgus,1 k8s.io/kubernetes/pkg/kubelet/sysctl,piosz,1 k8s.io/kubernetes/pkg/kubelet/types,jlowdermilk,1