Merge pull request #35673 from caesarxuchao/move-ListWatchUntil

Automatic merge from submit-queue

Minor change: move watch.ListWatchUntil to its own package

[ListWatcher](https://github.com/kubernetes/kubernetes/blob/master/pkg/watch/until.go#L89) will be changed from using api.ListOptions to v1.ListOptions, to be compatible with versioned clientset. This *will* cause import cycle, v1 [imports](https://github.com/kubernetes/kubernetes/blob/master/pkg/api/v1/register.go#L91) watch/version, which imports watch, which imports v1. This PR breaks the potential cycle.

This is going to make the migration to using client-go a little bit easier.
This commit is contained in:
Kubernetes Submit Queue 2016-10-30 16:40:02 -07:00 committed by GitHub
commit 8ca348a7a0
8 changed files with 141 additions and 155 deletions

View File

@ -20,12 +20,22 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (watch.Interface, error)
}
// ListFunc knows how to list resources
type ListFunc func(options api.ListOptions) (runtime.Object, error)
@ -84,3 +94,69 @@ func (lw *ListWatch) List(options api.ListOptions) (runtime.Object, error) {
func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(api.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *watch.Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
watchInterface, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}
return watch.Until(timeout, watchInterface, remainingConditions...)
}

View File

@ -20,6 +20,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
@ -28,7 +29,9 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/watch"
)
func parseSelectorOrDie(s string) fields.Selector {
@ -171,3 +174,54 @@ func TestListWatchesCanWatch(t *testing.T) {
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
type lw struct {
list runtime.Object
watch watch.Interface
}
func (w lw) List(options api.ListOptions) (runtime.Object, error) {
return w.list, nil
}
func (w lw) Watch(options api.ListOptions) (watch.Interface, error) {
return w.watch, nil
}
func TestListWatchUntil(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *api.Pod
fw.Modify(obj)
}()
listwatch := lw{
list: &api.PodList{Items: []api.Pod{{}}},
watch: fw,
}
conditions := []watch.ConditionFunc{
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Added, nil
},
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Modified, nil
},
}
timeout := 10 * time.Second
lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != watch.Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}

View File

@ -43,15 +43,6 @@ import (
"k8s.io/kubernetes/pkg/watch"
)
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (watch.Interface, error)
}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.

View File

@ -119,7 +119,7 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro
return b.CoreClient.Secrets(b.Namespace).Watch(options)
},
}
_, err = watch.ListWatchUntil(30*time.Second, lw,
_, err = cache.ListWatchUntil(30*time.Second, lw,
func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:

View File

@ -22,8 +22,6 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/meta:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/net:go_default_library",

View File

@ -19,9 +19,6 @@ package watch
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
@ -84,78 +81,3 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
}
return lastEvent, nil
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (Interface, error)
}
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...ConditionFunc) (*Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(api.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &Event{Type: Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
watch, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}
return Until(timeout, watch, remainingConditions...)
}

View File

@ -23,7 +23,6 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
@ -163,54 +162,3 @@ func TestUntilErrorCondition(t *testing.T) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}
type lw struct {
list runtime.Object
watch Interface
}
func (w lw) List(options api.ListOptions) (runtime.Object, error) {
return w.list, nil
}
func (w lw) Watch(options api.ListOptions) (Interface, error) {
return w.watch, nil
}
func TestListWatchUntil(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Modify(obj)
}()
listwatch := lw{
list: &api.PodList{Items: []api.Pod{{}}},
watch: fw,
}
conditions := []ConditionFunc{
func(event Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == Added, nil
},
func(event Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == Modified, nil
},
}
timeout := 10 * time.Second
lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}

View File

@ -282,13 +282,13 @@ Opaque resources should account opaque integer resources in pods with multiple c
Opaque resources should not break pods that do not consume opaque integer resources.,ConnorDoyle,0
Opaque resources should not schedule pods that exceed the available amount of opaque integer resource.,ConnorDoyle,0
Opaque resources should schedule pods that do consume opaque integer resources.,ConnorDoyle,0
PersistentVolumes with Single PV - PVC pairs create a PV and a pre-bound PVC: test write access,roberthbailey,1
PersistentVolumes with Single PV - PVC pairs create a PVC and a pre-bound PV: test write access,piosz,1
PersistentVolumes with Single PV - PVC pairs create a PVC and non-pre-bound PV: test write access,sttts,1
PersistentVolumes with Single PV - PVC pairs should create a non-pre-bound PV and PVC: test write access,yujuhong,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 2 PVs and 4 PVCs: test write access,brendandburns,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 3 PVs and 3 PVCs: test write access,hurf,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 4 PVs and 2 PVCs: test write access,jlowdermilk,1
PersistentVolumes with Single PV - PVC pairs create a PV and a pre-bound PVC: test write access,caesarxuchao,1
PersistentVolumes with Single PV - PVC pairs create a PVC and a pre-bound PV: test write access,caesarxuchao,1
PersistentVolumes with Single PV - PVC pairs create a PVC and non-pre-bound PV: test write access,caesarxuchao,1
PersistentVolumes with Single PV - PVC pairs should create a non-pre-bound PV and PVC: test write access,caesarxuchao,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 2 PVs and 4 PVCs: test write access,caesarxuchao,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 3 PVs and 3 PVCs: test write access,caesarxuchao,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 4 PVs and 2 PVCs: test write access,caesarxuchao,1
Pet Store should scale to persist a nominal number ( * ) of transactions in * seconds,xiang90,1
Pet set recreate should recreate evicted statefulset,roberthbailey,1
"Pod Disks Should schedule a pod w/ a RW PD, gracefully remove it, then schedule it on another host",alex-mohr,1
@ -453,10 +453,7 @@ k8s.io/kubernetes/cmd/kube-apiserver/app,nikhiljindal,0
k8s.io/kubernetes/cmd/kube-apiserver/app/options,nikhiljindal,0
k8s.io/kubernetes/cmd/kube-discovery/app,pmorie,1
k8s.io/kubernetes/cmd/kube-proxy/app,luxas,1
k8s.io/kubernetes/cmd/kubeadm/app/cmd,davidopp,1
k8s.io/kubernetes/cmd/kubeadm/app/images,saad-ali,1
k8s.io/kubernetes/cmd/kubeadm/app/util,eparis,1
k8s.io/kubernetes/cmd/kubeadm/app/cmd,vishh,1
k8s.io/kubernetes/cmd/kubeadm/app/cmd,caesarxuchao,1
k8s.io/kubernetes/cmd/kubeadm/app/images,davidopp,1
k8s.io/kubernetes/cmd/kubeadm/app/preflight,apprenda,0
k8s.io/kubernetes/cmd/kubeadm/app/util,krousey,1
@ -631,9 +628,9 @@ k8s.io/kubernetes/pkg/kubelet/qos,vishh,0
k8s.io/kubernetes/pkg/kubelet/rkt,apelisse,1
k8s.io/kubernetes/pkg/kubelet/rktshim,mml,1
k8s.io/kubernetes/pkg/kubelet/server,timstclair,0
k8s.io/kubernetes/pkg/kubelet/server/portforward,saad-ali,1
k8s.io/kubernetes/pkg/kubelet/server/portforward,caesarxuchao,1
k8s.io/kubernetes/pkg/kubelet/server/stats,timstclair,0
k8s.io/kubernetes/pkg/kubelet/server/streaming,derekwaynecarr,1
k8s.io/kubernetes/pkg/kubelet/server/streaming,caesarxuchao,1
k8s.io/kubernetes/pkg/kubelet/status,mwielgus,1
k8s.io/kubernetes/pkg/kubelet/sysctl,piosz,1
k8s.io/kubernetes/pkg/kubelet/types,jlowdermilk,1

1 name owner auto-assigned
282 Opaque resources should not break pods that do not consume opaque integer resources. ConnorDoyle 0
283 Opaque resources should not schedule pods that exceed the available amount of opaque integer resource. ConnorDoyle 0
284 Opaque resources should schedule pods that do consume opaque integer resources. ConnorDoyle 0
285 PersistentVolumes with Single PV - PVC pairs create a PV and a pre-bound PVC: test write access roberthbailey caesarxuchao 1
286 PersistentVolumes with Single PV - PVC pairs create a PVC and a pre-bound PV: test write access piosz caesarxuchao 1
287 PersistentVolumes with Single PV - PVC pairs create a PVC and non-pre-bound PV: test write access sttts caesarxuchao 1
288 PersistentVolumes with Single PV - PVC pairs should create a non-pre-bound PV and PVC: test write access yujuhong caesarxuchao 1
289 PersistentVolumes with multiple PVs and PVCs all in same ns should create 2 PVs and 4 PVCs: test write access brendandburns caesarxuchao 1
290 PersistentVolumes with multiple PVs and PVCs all in same ns should create 3 PVs and 3 PVCs: test write access hurf caesarxuchao 1
291 PersistentVolumes with multiple PVs and PVCs all in same ns should create 4 PVs and 2 PVCs: test write access jlowdermilk caesarxuchao 1
292 Pet Store should scale to persist a nominal number ( * ) of transactions in * seconds xiang90 1
293 Pet set recreate should recreate evicted statefulset roberthbailey 1
294 Pod Disks Should schedule a pod w/ a RW PD, gracefully remove it, then schedule it on another host alex-mohr 1
453 k8s.io/kubernetes/cmd/kube-apiserver/app/options nikhiljindal 0
454 k8s.io/kubernetes/cmd/kube-discovery/app pmorie 1
455 k8s.io/kubernetes/cmd/kube-proxy/app luxas 1
456 k8s.io/kubernetes/cmd/kubeadm/app/cmd davidopp caesarxuchao 1
k8s.io/kubernetes/cmd/kubeadm/app/images saad-ali 1
k8s.io/kubernetes/cmd/kubeadm/app/util eparis 1
k8s.io/kubernetes/cmd/kubeadm/app/cmd vishh 1
457 k8s.io/kubernetes/cmd/kubeadm/app/images davidopp 1
458 k8s.io/kubernetes/cmd/kubeadm/app/preflight apprenda 0
459 k8s.io/kubernetes/cmd/kubeadm/app/util krousey 1
628 k8s.io/kubernetes/pkg/kubelet/rkt apelisse 1
629 k8s.io/kubernetes/pkg/kubelet/rktshim mml 1
630 k8s.io/kubernetes/pkg/kubelet/server timstclair 0
631 k8s.io/kubernetes/pkg/kubelet/server/portforward saad-ali caesarxuchao 1
632 k8s.io/kubernetes/pkg/kubelet/server/stats timstclair 0
633 k8s.io/kubernetes/pkg/kubelet/server/streaming derekwaynecarr caesarxuchao 1
634 k8s.io/kubernetes/pkg/kubelet/status mwielgus 1
635 k8s.io/kubernetes/pkg/kubelet/sysctl piosz 1
636 k8s.io/kubernetes/pkg/kubelet/types jlowdermilk 1