diff --git a/pkg/client/cache/poller_test.go b/pkg/client/cache/poller_test.go index 2c8b240b71b..2c756b31275 100644 --- a/pkg/client/cache/poller_test.go +++ b/pkg/client/cache/poller_test.go @@ -104,6 +104,8 @@ func TestPoller_sync(t *testing.T) { } func TestPoller_Run(t *testing.T) { + stopCh := make(chan struct{}) + defer func() { stopCh <- struct{}{} }() s := NewStore(testPairKeyFunc) const count = 10 var called = 0 @@ -118,7 +120,7 @@ func TestPoller_Run(t *testing.T) { return testEnumerator{}, nil } return nil, errors.New("transient error") - }, time.Millisecond, s).Run() + }, time.Millisecond, s).RunUntil(stopCh) // The test here is that we get called at least count times. <-done diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index f484b3a2baf..0577c57f89b 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -49,18 +49,23 @@ type Reflector struct { listerWatcher ListerWatcher // period controls timing between one watch ending and // the beginning of the next one. - period time.Duration + period time.Duration + resyncPeriod time.Duration } // NewReflector creates a new Reflector object which will keep the given store up to // date with the server's contents for the given resource. Reflector promises to // only put things in the store that have the type of expectedType. -func NewReflector(lw ListerWatcher, expectedType interface{}, store Store) *Reflector { +// If resyncPeriod is non-zero, then lists will be executed after every resyncPeriod, +// so that you can use reflectors to periodically process everything as well as +// incrementally processing the things that change. +func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { r := &Reflector{ listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), period: time.Second, + resyncPeriod: resyncPeriod, } return r } @@ -77,8 +82,25 @@ func (r *Reflector) RunUntil(stopCh <-chan struct{}) { go util.Until(func() { r.listAndWatch() }, r.period, stopCh) } +var ( + // nothing will ever be sent down this channel + neverExitWatch <-chan time.Time = make(chan time.Time) + + // Used to indicate that watching stopped so that a resync could happen. + errorResyncRequested = errors.New("resync channel fired") +) + +// resyncChan returns a channel which will receive something when a resync is required. +func (r *Reflector) resyncChan() <-chan time.Time { + if r.resyncPeriod == 0 { + return neverExitWatch + } + return time.After(r.resyncPeriod) +} + func (r *Reflector) listAndWatch() { var resourceVersion string + exitWatch := r.resyncChan() list, err := r.listerWatcher.List() if err != nil { @@ -114,8 +136,10 @@ func (r *Reflector) listAndWatch() { } return } - if err := r.watchHandler(w, &resourceVersion); err != nil { - glog.Errorf("watch of %v ended with error: %v", r.expectedType, err) + if err := r.watchHandler(w, &resourceVersion, exitWatch); err != nil { + if err != errorResyncRequested { + glog.Errorf("watch of %v ended with error: %v", r.expectedType, err) + } return } } @@ -132,41 +156,47 @@ func (r *Reflector) syncWith(items []runtime.Object) error { } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) error { +func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, exitWatch <-chan time.Time) error { start := time.Now() eventCount := 0 +loop: for { - event, ok := <-w.ResultChan() - if !ok { - break + select { + case <-exitWatch: + w.Stop() + return errorResyncRequested + case event, ok := <-w.ResultChan(): + if !ok { + break loop + } + if event.Type == watch.Error { + return apierrs.FromObject(event.Object) + } + if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { + glog.Errorf("expected type %v, but watch event object had type %v", e, a) + continue + } + meta, err := meta.Accessor(event.Object) + if err != nil { + glog.Errorf("unable to understand watch event %#v", event) + continue + } + switch event.Type { + case watch.Added: + r.store.Add(event.Object) + case watch.Modified: + r.store.Update(event.Object) + case watch.Deleted: + // TODO: Will any consumers need access to the "last known + // state", which is passed in event.Object? If so, may need + // to change this. + r.store.Delete(event.Object) + default: + glog.Errorf("unable to understand watch event %#v", event) + } + *resourceVersion = meta.ResourceVersion() + eventCount++ } - if event.Type == watch.Error { - return apierrs.FromObject(event.Object) - } - if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { - glog.Errorf("expected type %v, but watch event object had type %v", e, a) - continue - } - meta, err := meta.Accessor(event.Object) - if err != nil { - glog.Errorf("unable to understand watch event %#v", event) - continue - } - switch event.Type { - case watch.Added: - r.store.Add(event.Object) - case watch.Modified: - r.store.Update(event.Object) - case watch.Deleted: - // TODO: Will any consumers need access to the "last known - // state", which is passed in event.Object? If so, may need - // to change this. - r.store.Delete(event.Object) - default: - glog.Errorf("unable to understand watch event %#v", event) - } - *resourceVersion = meta.ResourceVersion() - eventCount++ } watchDuration := time.Now().Sub(start) diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 38f5a6eaaae..5996329bd32 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -36,15 +37,27 @@ func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) { return t.WatchFunc(resourceVersion) } +func TestReflector_resyncChan(t *testing.T) { + s := NewStore(MetaNamespaceKeyFunc) + g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond) + a, b := g.resyncChan(), time.After(100*time.Millisecond) + select { + case <-a: + t.Logf("got timeout as expected") + case <-b: + t.Errorf("resyncChan() is at least 99 milliseconds late??") + } +} + func TestReflector_watchHandlerError(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &api.Pod{}, s) + g := NewReflector(&testLW{}, &api.Pod{}, s, 0) fw := watch.NewFake() go func() { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV) + err := g.watchHandler(fw, &resumeRV, neverExitWatch) if err == nil { t.Errorf("unexpected non-error") } @@ -52,7 +65,7 @@ func TestReflector_watchHandlerError(t *testing.T) { func TestReflector_watchHandler(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &api.Pod{}, s) + g := NewReflector(&testLW{}, &api.Pod{}, s, 0) fw := watch.NewFake() s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}) @@ -64,7 +77,7 @@ func TestReflector_watchHandler(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV) + err := g.watchHandler(fw, &resumeRV, neverExitWatch) if err != nil { t.Errorf("unexpected error %v", err) } @@ -101,6 +114,19 @@ func TestReflector_watchHandler(t *testing.T) { } } +func TestReflector_watchHandlerTimeout(t *testing.T) { + s := NewStore(MetaNamespaceKeyFunc) + g := NewReflector(&testLW{}, &api.Pod{}, s, 0) + fw := watch.NewFake() + var resumeRV string + exit := make(chan time.Time, 1) + exit <- time.Now() + err := g.watchHandler(fw, &resumeRV, exit) + if err != errorResyncRequested { + t.Errorf("expected timeout error, but got %q", err) + } +} + func TestReflector_listAndWatch(t *testing.T) { createdFakes := make(chan *watch.FakeWatcher) @@ -125,7 +151,7 @@ func TestReflector_listAndWatch(t *testing.T) { }, } s := NewFIFO(MetaNamespaceKeyFunc) - r := NewReflector(lw, &api.Pod{}, s) + r := NewReflector(lw, &api.Pod{}, s, 0) go r.listAndWatch() ids := []string{"foo", "bar", "baz", "qux", "zoo"} @@ -242,7 +268,7 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) { return item.list, item.listErr }, } - r := NewReflector(lw, &api.Pod{}, s) + r := NewReflector(lw, &api.Pod{}, s, 0) r.listAndWatch() } } diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index b8062504f86..9800373360b 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -49,7 +49,7 @@ func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{} } updates <- kubelet.PodUpdate{bpods, kubelet.SET, kubelet.ApiserverSource} } - cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)).Run() + cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run() } func getHostFieldLabel(apiVersion string) string { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4f0c954547f..e458241bbf4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -102,7 +102,12 @@ func NewMainKubelet( serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc) if kubeClient != nil { - cache.NewReflector(cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()), &api.Service{}, serviceStore).Run() + cache.NewReflector( + cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()), + &api.Service{}, + serviceStore, + 0, + ).Run() } serviceLister := &cache.StoreToServiceLister{serviceStore} diff --git a/plugin/pkg/admission/namespace/autoprovision/admission.go b/plugin/pkg/admission/namespace/autoprovision/admission.go index 0cfd7bbe4e0..3e18f08c6d9 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission.go @@ -90,6 +90,7 @@ func NewProvision(c client.Interface) admission.Interface { }, &api.Namespace{}, store, + 0, ) reflector.Run() return &provision{ diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index 5a806d984d0..ac636b82a70 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -94,6 +94,7 @@ func NewExists(c client.Interface) admission.Interface { }, &api.Namespace{}, store, + 0, ) reflector.Run() return &exists{ diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 9f33dc4b7f4..494a9f5aee2 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -114,18 +114,18 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe } // Watch and queue pods that need scheduling. - cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue).Run() + cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run() // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. - cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store).Run() + cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store, 0).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. if false { // Disable this code until minions support watches. Note when this code is enabled, // we need to make sure minion ListWatcher has proper FieldSelector. - cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store).Run() + cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store, 0).Run() } else { cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run() } @@ -133,7 +133,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe // Watch and cache all service objects. Scheduler needs to find all pods // created by the same service, so that it can spread them correctly. // Cache this locally. - cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store).Run() + cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).Run() r := rand.New(rand.NewSource(time.Now().UnixNano()))