From 07b8373ab393547239ed4fe227758a1715480f22 Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Fri, 3 Aug 2018 15:22:42 +0200 Subject: [PATCH 1/5] Move ListWatchUntil to its kin --- .../k8s.io/client-go/tools/cache/listwatch.go | 78 ------------------- .../src/k8s.io/client-go/tools/watch/until.go | 76 ++++++++++++++++++ 2 files changed, 76 insertions(+), 78 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/listwatch.go b/staging/src/k8s.io/client-go/tools/cache/listwatch.go index 30463aea7de..3db9639637e 100644 --- a/staging/src/k8s.io/client-go/tools/cache/listwatch.go +++ b/staging/src/k8s.io/client-go/tools/cache/listwatch.go @@ -20,15 +20,12 @@ import ( "context" "time" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/pager" - watchtools "k8s.io/client-go/tools/watch" ) // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. @@ -113,78 +110,3 @@ func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { return lw.WatchFunc(options) } - -// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout -// if timeout is exceeded without all conditions returning true, or an error if an error occurs. -// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { - if len(conditions) == 0 { - return nil, nil - } - - list, err := lw.List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - initialItems, err := meta.ExtractList(list) - if err != nil { - return nil, err - } - - // use the initial items as simulated "adds" - var lastEvent *watch.Event - currIndex := 0 - passedConditions := 0 - for _, condition := range conditions { - // check the next condition against the previous event and short circuit waiting for the next watch - if lastEvent != nil { - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - passedConditions = passedConditions + 1 - continue - } - } - - ConditionSucceeded: - for currIndex < len(initialItems) { - lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]} - currIndex++ - - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - passedConditions = passedConditions + 1 - break ConditionSucceeded - } - } - } - if passedConditions == len(conditions) { - return lastEvent, nil - } - remainingConditions := conditions[passedConditions:] - - metaObj, err := meta.ListAccessor(list) - if err != nil { - return nil, err - } - currResourceVersion := metaObj.GetResourceVersion() - - watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion}) - if err != nil { - return nil, err - } - - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() - evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...) - if err == watchtools.ErrWatchClosed { - // present a consistent error interface to callers - err = wait.ErrWaitTimeout - } - return evt, err -} diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index 4a891b23511..724e726942f 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -22,6 +22,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" ) @@ -100,3 +101,78 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) ( return context.WithTimeout(parent, timeout) } + +// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout +// if timeout is exceeded without all conditions returning true, or an error if an error occurs. +// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. +func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { + if len(conditions) == 0 { + return nil, nil + } + + list, err := lw.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + initialItems, err := meta.ExtractList(list) + if err != nil { + return nil, err + } + + // use the initial items as simulated "adds" + var lastEvent *watch.Event + currIndex := 0 + passedConditions := 0 + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + continue + } + } + + ConditionSucceeded: + for currIndex < len(initialItems) { + lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]} + currIndex++ + + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + break ConditionSucceeded + } + } + } + if passedConditions == len(conditions) { + return lastEvent, nil + } + remainingConditions := conditions[passedConditions:] + + metaObj, err := meta.ListAccessor(list) + if err != nil { + return nil, err + } + currResourceVersion := metaObj.GetResourceVersion() + + watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion}) + if err != nil { + return nil, err + } + + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...) + if err == watchtools.ErrWatchClosed { + // present a consistent error interface to callers + err = wait.ErrWaitTimeout + } + return evt, err +} From e434f3189e81e473aa89e3589ca81545d1341cf8 Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Fri, 3 Aug 2018 16:08:28 +0200 Subject: [PATCH 2/5] Deprecate ListWatchUntil, fix it and call places --- pkg/client/tests/listwatch_test.go | 2 +- pkg/controller/client_builder.go | 3 ++- staging/src/k8s.io/client-go/tools/watch/until.go | 13 +++++++++---- .../k8s.io/client-go/util/certificate/csr/csr.go | 6 ++++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/client/tests/listwatch_test.go b/pkg/client/tests/listwatch_test.go index 0fcc79c4867..35a8a256e44 100644 --- a/pkg/client/tests/listwatch_test.go +++ b/pkg/client/tests/listwatch_test.go @@ -214,7 +214,7 @@ func TestListWatchUntil(t *testing.T) { } timeout := 10 * time.Second - lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...) + lastEvent, err := watchtools.ListWatchUntil(timeout, listwatch, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } diff --git a/pkg/controller/client_builder.go b/pkg/controller/client_builder.go index 5f32728d12e..bcf2182d090 100644 --- a/pkg/controller/client_builder.go +++ b/pkg/controller/client_builder.go @@ -33,6 +33,7 @@ import ( v1core "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/serviceaccount" @@ -122,7 +123,7 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro return b.CoreClient.Secrets(b.Namespace).Watch(options) }, } - _, err = cache.ListWatchUntil(30*time.Second, lw, + _, err = watchtools.ListWatchUntil(30*time.Second, lw, func(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index 724e726942f..1907b68a3f0 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -23,8 +23,10 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" ) // ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, @@ -105,7 +107,10 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) ( // ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout // if timeout is exceeded without all conditions returning true, or an error if an error occurs. // TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { +// TODO: remove when no longer used +// +// Deprecated: Use UntilWithSync instead. +func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) { if len(conditions) == 0 { return nil, nil } @@ -167,10 +172,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch return nil, err } - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout) defer cancel() - evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...) - if err == watchtools.ErrWatchClosed { + evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...) + if err == ErrWatchClosed { // present a consistent error interface to callers err = wait.ErrWaitTimeout } diff --git a/staging/src/k8s.io/client-go/util/certificate/csr/csr.go b/staging/src/k8s.io/client-go/util/certificate/csr/csr.go index 22112a5b5b6..4a53352fee0 100644 --- a/staging/src/k8s.io/client-go/util/certificate/csr/csr.go +++ b/staging/src/k8s.io/client-go/util/certificate/csr/csr.go @@ -24,10 +24,11 @@ import ( "encoding/base64" "encoding/pem" "fmt" - "github.com/golang/glog" "reflect" "time" + "github.com/golang/glog" + certificates "k8s.io/api/certificates/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/watch" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" certutil "k8s.io/client-go/util/cert" ) @@ -121,7 +123,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) { fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String() - event, err := cache.ListWatchUntil( + event, err := watchtools.ListWatchUntil( timeout, &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { From 866cc1acab6c1c30a7550b2de7160c8052be884d Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Fri, 3 Aug 2018 16:45:41 +0200 Subject: [PATCH 3/5] Add UntilWithSync (informer based) --- .../k8s.io/apimachinery/pkg/watch/watch.go | 47 ++++ .../apimachinery/pkg/watch/watch_test.go | 38 +++ .../client-go/tools/watch/informerwatcher.go | 114 ++++++++ .../tools/watch/informerwatcher_test.go | 236 ++++++++++++++++ .../src/k8s.io/client-go/tools/watch/until.go | 42 +++ .../client-go/tools/watch/until_test.go | 129 +++++++++ test/integration/apimachinery/main_test.go | 27 ++ .../apimachinery/watch_restart_test.go | 258 ++++++++++++++++++ 8 files changed, 891 insertions(+) create mode 100644 staging/src/k8s.io/client-go/tools/watch/informerwatcher.go create mode 100644 staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go create mode 100644 test/integration/apimachinery/main_test.go create mode 100644 test/integration/apimachinery/watch_restart_test.go diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/watch.go b/staging/src/k8s.io/apimachinery/pkg/watch/watch.go index 5c1380b2349..a627d1d572c 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/watch.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/watch.go @@ -268,3 +268,50 @@ func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) { } } } + +// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe. +type ProxyWatcher struct { + result chan Event + stopCh chan struct{} + + mutex sync.Mutex + stopped bool +} + +var _ Interface = &ProxyWatcher{} + +// NewProxyWatcher creates new ProxyWatcher by wrapping a channel +func NewProxyWatcher(ch chan Event) *ProxyWatcher { + return &ProxyWatcher{ + result: ch, + stopCh: make(chan struct{}), + stopped: false, + } +} + +// Stop implements Interface +func (pw *ProxyWatcher) Stop() { + pw.mutex.Lock() + defer pw.mutex.Unlock() + if !pw.stopped { + pw.stopped = true + close(pw.stopCh) + } +} + +// Stopping returns true if Stop() has been called +func (pw *ProxyWatcher) Stopping() bool { + pw.mutex.Lock() + defer pw.mutex.Unlock() + return pw.stopped +} + +// ResultChan implements Interface +func (pw *ProxyWatcher) ResultChan() <-chan Event { + return pw.result +} + +// StopChan returns stop channel +func (pw *ProxyWatcher) StopChan() <-chan struct{} { + return pw.stopCh +} diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go b/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go index bdf7fedd4af..4fb159b0dd8 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go @@ -17,6 +17,7 @@ limitations under the License. package watch_test import ( + "reflect" "testing" "k8s.io/apimachinery/pkg/runtime" @@ -135,3 +136,40 @@ func TestEmpty(t *testing.T) { t.Errorf("unexpected result channel result") } } + +func TestProxyWatcher(t *testing.T) { + events := []Event{ + {Added, testType("foo")}, + {Modified, testType("qux")}, + {Modified, testType("bar")}, + {Deleted, testType("bar")}, + {Error, testType("error: blah")}, + } + + ch := make(chan Event, len(events)) + w := NewProxyWatcher(ch) + + for _, e := range events { + ch <- e + } + + for _, e := range events { + g := <-w.ResultChan() + if !reflect.DeepEqual(e, g) { + t.Errorf("Expected %#v, got %#v", e, g) + continue + } + } + + w.Stop() + + select { + // Closed channel always reads immediately + case <-w.StopChan(): + default: + t.Error("Channel isn't closed") + } + + // Test double close + w.Stop() +} diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go new file mode 100644 index 00000000000..35a34694939 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go @@ -0,0 +1,114 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func newTicketer() *ticketer { + return &ticketer{ + cond: sync.NewCond(&sync.Mutex{}), + } +} + +type ticketer struct { + counter uint64 + + cond *sync.Cond + current uint64 +} + +func (t *ticketer) GetTicket() uint64 { + // -1 to start from 0 + return atomic.AddUint64(&t.counter, 1) - 1 +} + +func (t *ticketer) WaitForTicket(ticket uint64, f func()) { + t.cond.L.Lock() + defer t.cond.L.Unlock() + for ticket != t.current { + t.cond.Wait() + } + + f() + + t.current++ + t.cond.Broadcast() +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) { + ch := make(chan watch.Event) + w := watch.NewProxyWatcher(ch) + t := newTicketer() + + indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + select { + case ch <- watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + UpdateFunc: func(old, new interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + select { + case ch <- watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + DeleteFunc: func(obj interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using watch API based on watch.Event + // but the caller can filter such objects by checking if metadata.deletionTimestamp is set + obj = staleObj + } + + select { + case ch <- watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + }, cache.Indexers{}) + + go func() { + informer.Run(w.StopChan()) + }() + + return indexer, informer, w +} diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go new file mode 100644 index 00000000000..e94b4d25638 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go @@ -0,0 +1,236 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "math/rand" + "reflect" + "sort" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/watch" + fakeclientset "k8s.io/client-go/kubernetes/fake" + testcore "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +type byEventTypeAndName []watch.Event + +func (a byEventTypeAndName) Len() int { return len(a) } +func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byEventTypeAndName) Less(i, j int) bool { + if a[i].Type < a[j].Type { + return true + } + + if a[i].Type > a[j].Type { + return false + } + + return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name +} + +func TestTicketer(t *testing.T) { + tg := newTicketer() + + const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines + var tickets []uint64 + for i := 0; i < numTickets; i++ { + ticket := tg.GetTicket() + tickets = append(tickets, ticket) + + exp, got := uint64(i), ticket + if got != exp { + t.Fatalf("expected ticket %d, got %d", exp, got) + } + } + + // shuffle tickets + rand.Shuffle(len(tickets), func(i, j int) { + tickets[i], tickets[j] = tickets[j], tickets[i] + }) + + res := make(chan uint64, len(tickets)) + for _, ticket := range tickets { + go func(ticket uint64) { + time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) + tg.WaitForTicket(ticket, func() { + res <- ticket + }) + }(ticket) + } + + for i := 0; i < numTickets; i++ { + exp, got := uint64(i), <-res + if got != exp { + t.Fatalf("expected ticket %d, got %d", exp, got) + } + } +} + +func TestNewInformerWatcher(t *testing.T) { + // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. + tt := []struct { + name string + objects []runtime.Object + events []watch.Event + }{ + { + name: "basic test", + objects: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + }, + StringData: map[string]string{ + "foo-1": "initial", + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + }, + StringData: map[string]string{ + "foo-2": "initial", + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + }, + StringData: map[string]string{ + "foo-3": "initial", + }, + }, + }, + events: []watch.Event{ + { + Type: watch.Added, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-4", + }, + StringData: map[string]string{ + "foo-4": "initial", + }, + }, + }, + { + Type: watch.Modified, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + }, + StringData: map[string]string{ + "foo-2": "new", + }, + }, + }, + { + Type: watch.Deleted, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + }, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + var expected []watch.Event + for _, o := range tc.objects { + expected = append(expected, watch.Event{ + Type: watch.Added, + Object: o.DeepCopyObject(), + }) + } + for _, e := range tc.events { + expected = append(expected, *e.DeepCopy()) + } + + fake := fakeclientset.NewSimpleClientset(tc.objects...) + fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false) + fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil)) + + for _, e := range tc.events { + fakeWatch.Action(e.Type, e.Object) + } + + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fake.Core().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fake.Core().Secrets("").Watch(options) + }, + } + _, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{}) + + var result []watch.Event + loop: + for { + var event watch.Event + var ok bool + select { + case event, ok = <-w.ResultChan(): + if !ok { + t.Errorf("Failed to read event: channel is already closed!") + return + } + + result = append(result, *event.DeepCopy()) + case <-time.After(time.Second * 1): + // All the events are buffered -> this means we are done + // Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event + break loop + } + } + + // Informers don't guarantee event order so we need to sort these arrays to compare them + sort.Sort(byEventTypeAndName(expected)) + sort.Sort(byEventTypeAndName(result)) + + if !reflect.DeepEqual(expected, result) { + t.Error(spew.Errorf("\nexpected: %#v,\ngot: %#v,\ndiff: %s", expected, result, diff.ObjectReflectDiff(expected, result))) + return + } + + // Fill in some data to test watch closing while there are some events to be read + for _, e := range tc.events { + fakeWatch.Action(e.Type, e.Object) + } + + // Stop before reading all the data to make sure the informer can deal with closed channel + w.Stop() + + // Wait a bit to see if the informer won't panic + // TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591) + time.Sleep(1 * time.Second) + }) + } + +} diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index 1907b68a3f0..93357884397 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -19,16 +19,22 @@ package watch import ( "context" "errors" + "fmt" "time" "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) +// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition failed or detected an error state. +type PreconditionFunc func(store cache.Store) (bool, error) + // ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, // or an error if the condition cannot be checked and should terminate. In general, it is better to define // level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed @@ -89,6 +95,42 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions return lastEvent, nil } +// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, +// and watches the output until each provided condition succeeds, in a way that is identical +// to function UntilWithoutRetry. (See above.) +// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'. +// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will +// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple +// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will +// re-list to recover and you always get an event, if there has been a change, after recovery. +// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for +// particular object, not between more of them even it's the same resource. +// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: +// waiting for object reaching a state, "small" controllers, ... +func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { + indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType) + // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and + // let UntilWithoutRetry to stop it + defer watcher.Stop() + + if precondition != nil { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %v", ctx.Err()) + } + + done, err := precondition(indexer) + if err != nil { + return nil, err + } + + if done { + return nil, nil + } + } + + return UntilWithoutRetry(ctx, watcher, conditions...) +} + // ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { if timeout < 0 { diff --git a/staging/src/k8s.io/client-go/tools/watch/until_test.go b/staging/src/k8s.io/client-go/tools/watch/until_test.go index e766acd7361..dd0559461eb 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until_test.go +++ b/staging/src/k8s.io/client-go/tools/watch/until_test.go @@ -19,14 +19,19 @@ package watch import ( "context" "errors" + "reflect" "strings" "testing" "time" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + fakeclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" ) type fakePod struct { @@ -172,3 +177,127 @@ func TestUntilErrorCondition(t *testing.T) { t.Fatalf("expected %q in error string, got %q", expected, err.Error()) } } + +func TestUntilWithSync(t *testing.T) { + // FIXME: test preconditions + tt := []struct { + name string + lw *cache.ListWatch + preconditionFunc PreconditionFunc + conditionFunc ConditionFunc + expectedErr error + expectedEvent *watch.Event + }{ + { + name: "doesn't wait for sync with no precondition", + lw: &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + select {} + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + select {} + }, + }, + preconditionFunc: nil, + conditionFunc: func(e watch.Event) (bool, error) { + return true, nil + }, + expectedErr: errors.New("timed out waiting for the condition"), + expectedEvent: nil, + }, + { + name: "waits indefinitely with precondition if it can't sync", + lw: &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + select {} + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + select {} + }, + }, + preconditionFunc: func(store cache.Store) (bool, error) { + return true, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + return true, nil + }, + expectedErr: errors.New("UntilWithSync: unable to sync caches: context deadline exceeded"), + expectedEvent: nil, + }, + { + name: "precondition can stop the loop", + lw: func() *cache.ListWatch { + fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}) + + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fakeclient.CoreV1().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fakeclient.CoreV1().Secrets("").Watch(options) + }, + } + }(), + preconditionFunc: func(store cache.Store) (bool, error) { + _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: "", Name: "first"}) + if err != nil { + return true, err + } + if exists { + return true, nil + } + return false, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + return true, errors.New("should never reach this") + }, + expectedErr: nil, + expectedEvent: nil, + }, + { + name: "precondition lets it proceed to regular condition", + lw: func() *cache.ListWatch { + fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}) + + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fakeclient.CoreV1().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fakeclient.CoreV1().Secrets("").Watch(options) + }, + } + }(), + preconditionFunc: func(store cache.Store) (bool, error) { + return false, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + if e.Type == watch.Added { + return true, nil + } + panic("no other events are expected") + }, + expectedErr: nil, + expectedEvent: &watch.Event{Type: watch.Added, Object: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}}, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + // Informer waits for caches to sync by polling in 100ms intervals, + // timeout needs to be reasonably higher + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + event, err := UntilWithSync(ctx, tc.lw, &corev1.Secret{}, tc.preconditionFunc, tc.conditionFunc) + + if !reflect.DeepEqual(err, tc.expectedErr) { + t.Errorf("expected error %#v, got %#v", tc.expectedErr, err) + } + + if !reflect.DeepEqual(event, tc.expectedEvent) { + t.Errorf("expected event %#v, got %#v", tc.expectedEvent, event) + } + }) + } +} diff --git a/test/integration/apimachinery/main_test.go b/test/integration/apimachinery/main_test.go new file mode 100644 index 00000000000..673c1cc1a03 --- /dev/null +++ b/test/integration/apimachinery/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/apimachinery/watch_restart_test.go b/test/integration/apimachinery/watch_restart_test.go new file mode 100644 index 00000000000..547ab8dd402 --- /dev/null +++ b/test/integration/apimachinery/watch_restart_test.go @@ -0,0 +1,258 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/test/integration/framework" +) + +func noopNormalization(output []string) []string { + return output +} + +func normalizeInformerOutputFunc(initialVal string) func(output []string) []string { + return func(output []string) []string { + result := make([]string, 0, len(output)) + + // Removes initial value and all of its direct repetitions + lastVal := initialVal + for _, v := range output { + // Make values unique as informer(List+Watch) duplicates some events + if v == lastVal { + continue + } + result = append(result, v) + lastVal = v + } + + return result + } +} + +func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { + // Has to be longer than 5 seconds + timeout := 2 * time.Minute + + // Set up a master + masterConfig := framework.NewIntegrationTestMasterConfig() + // Timeout is set random between MinRequestTimeout and 2x + masterConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4 + _, s, closeFn := framework.RunAMaster(masterConfig) + defer closeFn() + + config := &restclient.Config{ + Host: s.URL, + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[corev1.GroupName].GroupVersion()}, + } + + namespaceObject := framework.CreateTestingNamespace("retry-watch", s, t) + defer framework.DeleteTestingNamespace(namespaceObject, s, t) + + getListFunc := func(c *kubernetes.Clientset, secret *v1.Secret) func(options metav1.ListOptions) *v1.SecretList { + return func(options metav1.ListOptions) *v1.SecretList { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String() + res, err := c.CoreV1().Secrets(secret.Namespace).List(options) + if err != nil { + t.Fatalf("Failed to list Secrets: %v", err) + } + return res + } + } + + getWatchFunc := func(c *kubernetes.Clientset, secret *v1.Secret) func(options metav1.ListOptions) (watch.Interface, error) { + return func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String() + res, err := c.CoreV1().Secrets(secret.Namespace).Watch(options) + if err != nil { + t.Fatalf("Failed to create a watcher on Secrets: %v", err) + } + return res, err + } + } + + generateEvents := func(t *testing.T, c *kubernetes.Clientset, secret *v1.Secret, referenceOutput *[]string, stopChan chan struct{}, stoppedChan chan struct{}) { + defer close(stoppedChan) + counter := 0 + + // These 5 seconds are here to protect against a race at the end when we could write something there at the same time as watch.Until ends + softTimeout := timeout - 5*time.Second + if softTimeout < 0 { + panic("Timeout has to be grater than 5 seconds!") + } + endChannel := time.After(softTimeout) + for { + select { + // TODO: get this lower once we figure out how to extend ETCD cache + case <-time.After(1000 * time.Millisecond): + counter = counter + 1 + + patch := fmt.Sprintf(`{"metadata": {"annotations": {"count": "%d"}}}`, counter) + _, err := c.CoreV1().Secrets(secret.Namespace).Patch(secret.Name, types.StrategicMergePatchType, []byte(patch)) + if err != nil { + t.Fatalf("Failed to patch secret: %v", err) + } + + *referenceOutput = append(*referenceOutput, fmt.Sprintf("%d", counter)) + case <-endChannel: + return + case <-stopChan: + return + } + } + } + + initialCount := "0" + newTestSecret := func(name string) *v1.Secret { + return &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespaceObject.Name, + Annotations: map[string]string{ + "count": initialCount, + }, + }, + Data: map[string][]byte{ + "data": []byte("value1\n"), + }, + } + } + + tt := []struct { + name string + succeed bool + secret *v1.Secret + getWatcher func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) + normalizeOutputFunc func(referenceOutput []string) []string + }{ + { + name: "regular watcher should fail", + succeed: false, + secret: newTestSecret("secret-01"), + getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) { + options := metav1.ListOptions{ + ResourceVersion: secret.ResourceVersion, + } + return getWatchFunc(c, secret)(options) + }, // regular watcher; unfortunately destined to fail + normalizeOutputFunc: noopNormalization, + }, + { + name: "InformerWatcher survives closed watches", + succeed: true, + secret: newTestSecret("secret-03"), + getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) { + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return getListFunc(c, secret)(options), nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return getWatchFunc(c, secret)(options) + }, + } + _, _, w := watchtools.NewIndexerInformerWatcher(lw, &v1.Secret{}) + return w, nil + }, + normalizeOutputFunc: normalizeInformerOutputFunc(initialCount), + }, + } + + for _, tmptc := range tt { + tc := tmptc // we need to copy it for parallel runs + t.Run(tc.name, func(t *testing.T) { + c, err := kubernetes.NewForConfig(config) + if err != nil { + t.Fatalf("Failed to create clientset: %v", err) + } + + secret, err := c.CoreV1().Secrets(tc.secret.Namespace).Create(tc.secret) + if err != nil { + t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err) + } + + watcher, err := tc.getWatcher(c, secret) + if err != nil { + t.Fatalf("Failed to create watcher: %v", err) + } + + var referenceOutput []string + var output []string + stopChan := make(chan struct{}) + stoppedChan := make(chan struct{}) + go generateEvents(t, c, secret, &referenceOutput, stopChan, stoppedChan) + + // Record current time to be able to asses if the timeout has been reached + startTime := time.Now() + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) { + s, ok := event.Object.(*v1.Secret) + if !ok { + t.Fatalf("Received an object that is not a Secret: %#v", event.Object) + } + output = append(output, s.Annotations["count"]) + // Watch will never end voluntarily + return false, nil + }) + watchDuration := time.Since(startTime) + close(stopChan) + <-stoppedChan + + output = tc.normalizeOutputFunc(output) + + t.Logf("Watch duration: %v; timeout: %v", watchDuration, timeout) + + if err == nil && !tc.succeed { + t.Fatalf("Watch should have timed out but it exited without an error!") + } + + if err != wait.ErrWaitTimeout && tc.succeed { + t.Fatalf("Watch exited with error: %v!", err) + } + + if watchDuration < timeout && tc.succeed { + t.Fatalf("Watch should have timed out after %v but it timed out prematurely after %v!", timeout, watchDuration) + } + + if watchDuration >= timeout && !tc.succeed { + t.Fatalf("Watch should have timed out but it succeeded!") + } + + if tc.succeed && !reflect.DeepEqual(referenceOutput, output) { + t.Fatalf("Reference and real output differ! We must have lost some events or read some multiple times!\nRef: %#v\nReal: %#v", referenceOutput, output) + } + }) + } +} From dc2cfd5d208bfdb105d055471b778f0dd87ba99d Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Fri, 3 Aug 2018 16:46:26 +0200 Subject: [PATCH 4/5] Update Bazel --- pkg/controller/BUILD | 1 + .../src/k8s.io/client-go/tools/cache/BUILD | 1 - .../src/k8s.io/client-go/tools/watch/BUILD | 21 +++++++++- .../client-go/util/certificate/csr/BUILD | 1 + test/integration/BUILD | 1 + test/integration/apimachinery/BUILD | 38 +++++++++++++++++++ 6 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 test/integration/apimachinery/BUILD diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index 6278fd367fd..027ec4e6d28 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -85,6 +85,7 @@ go_library( "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//staging/src/k8s.io/client-go/util/integer:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//vendor/github.com/golang/glog:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/cache/BUILD b/staging/src/k8s.io/client-go/tools/cache/BUILD index b7777496d6c..0240fa6f700 100644 --- a/staging/src/k8s.io/client-go/tools/cache/BUILD +++ b/staging/src/k8s.io/client-go/tools/cache/BUILD @@ -81,7 +81,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/pager:go_default_library", - "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//staging/src/k8s.io/client-go/util/buffer:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//vendor/github.com/golang/glog:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/watch/BUILD b/staging/src/k8s.io/client-go/tools/watch/BUILD index 590e5765478..d1994ebbbef 100644 --- a/staging/src/k8s.io/client-go/tools/watch/BUILD +++ b/staging/src/k8s.io/client-go/tools/watch/BUILD @@ -2,26 +2,43 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["until.go"], + srcs = [ + "informerwatcher.go", + "until.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/watch", importpath = "k8s.io/client-go/tools/watch", visibility = ["//visibility:public"], deps = [ + "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["until_test.go"], + srcs = [ + "informerwatcher_test.go", + "until_test.go", + ], embed = [":go_default_library"], deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/github.com/davecgh/go-spew/spew:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/util/certificate/csr/BUILD b/staging/src/k8s.io/client-go/util/certificate/csr/BUILD index 72dee726fac..e160947dbc2 100644 --- a/staging/src/k8s.io/client-go/util/certificate/csr/BUILD +++ b/staging/src/k8s.io/client-go/util/certificate/csr/BUILD @@ -22,6 +22,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//staging/src/k8s.io/client-go/util/cert:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], diff --git a/test/integration/BUILD b/test/integration/BUILD index 1d63998457a..4fd8a5791e3 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -35,6 +35,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//test/integration/apimachinery:all-srcs", "//test/integration/apiserver:all-srcs", "//test/integration/auth:all-srcs", "//test/integration/benchmark/jsonify:all-srcs", diff --git a/test/integration/apimachinery/BUILD b/test/integration/apimachinery/BUILD new file mode 100644 index 00000000000..befaf151adb --- /dev/null +++ b/test/integration/apimachinery/BUILD @@ -0,0 +1,38 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "go_default_test", + srcs = [ + "main_test.go", + "watch_restart_test.go", + ], + deps = [ + "//pkg/api/testapi:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/tools/watch:go_default_library", + "//test/integration/framework:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) From f6836df5dd104bde62d80be411582c5d08dcaa65 Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Tue, 7 Aug 2018 20:15:14 +0200 Subject: [PATCH 5/5] Update Godeps --- staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json | 4 ---- staging/src/k8s.io/apiserver/Godeps/Godeps.json | 4 ---- staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json | 4 ---- staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json | 4 ---- staging/src/k8s.io/sample-controller/Godeps/Godeps.json | 4 ---- 5 files changed, 20 deletions(-) diff --git a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json index 25d5109efd9..ab8bdb101db 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json @@ -2054,10 +2054,6 @@ "ImportPath": "k8s.io/client-go/tools/reference", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, - { - "ImportPath": "k8s.io/client-go/tools/watch", - "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - }, { "ImportPath": "k8s.io/client-go/transport", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiserver/Godeps/Godeps.json index 56a1a70e72e..b69ee12cfc3 100644 --- a/staging/src/k8s.io/apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiserver/Godeps/Godeps.json @@ -1782,10 +1782,6 @@ "ImportPath": "k8s.io/client-go/tools/reference", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, - { - "ImportPath": "k8s.io/client-go/tools/watch", - "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - }, { "ImportPath": "k8s.io/client-go/transport", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json index 99e78461f8c..858b91e5024 100644 --- a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json +++ b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json @@ -1702,10 +1702,6 @@ "ImportPath": "k8s.io/client-go/tools/reference", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, - { - "ImportPath": "k8s.io/client-go/tools/watch", - "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - }, { "ImportPath": "k8s.io/client-go/transport", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json index a08df44527f..e286c27a4f5 100644 --- a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json @@ -1670,10 +1670,6 @@ "ImportPath": "k8s.io/client-go/tools/reference", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, - { - "ImportPath": "k8s.io/client-go/tools/watch", - "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - }, { "ImportPath": "k8s.io/client-go/transport", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/sample-controller/Godeps/Godeps.json b/staging/src/k8s.io/sample-controller/Godeps/Godeps.json index 9e504df0519..e64d0c761b0 100644 --- a/staging/src/k8s.io/sample-controller/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-controller/Godeps/Godeps.json @@ -1066,10 +1066,6 @@ "ImportPath": "k8s.io/client-go/tools/reference", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, - { - "ImportPath": "k8s.io/client-go/tools/watch", - "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - }, { "ImportPath": "k8s.io/client-go/transport", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"