Merge pull request #1389 from lavalamp/scheduler

make watch actually return an error when there's an error
This commit is contained in:
Brendan Burns 2014-09-22 13:59:57 -07:00
commit 3346c68d33
5 changed files with 84 additions and 17 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"errors"
"fmt" "fmt"
"reflect" "reflect"
"time" "time"
@ -99,7 +100,10 @@ func (r *Reflector) listAndWatch() {
glog.Errorf("failed to watch %v: %v", r.expectedType, err) glog.Errorf("failed to watch %v: %v", r.expectedType, err)
return return
} }
r.watchHandler(w, &resourceVersion) if err := r.watchHandler(w, &resourceVersion); err != nil {
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
return
}
} }
} }
@ -119,12 +123,13 @@ func (r *Reflector) syncWith(items []runtime.Object) error {
} }
// watchHandler watches w and keeps *resourceVersion up to date. // watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) error {
start := time.Now()
eventCount := 0
for { for {
event, ok := <-w.ResultChan() event, ok := <-w.ResultChan()
if !ok { if !ok {
glog.Errorf("unexpected watch close") break
return
} }
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
glog.Errorf("expected type %v, but watch event object had type %v", e, a) glog.Errorf("expected type %v, but watch event object had type %v", e, a)
@ -149,5 +154,14 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
glog.Errorf("unable to understand watch event %#v", event) glog.Errorf("unable to understand watch event %#v", event)
} }
*resourceVersion = jsonBase.ResourceVersion() + 1 *resourceVersion = jsonBase.ResourceVersion() + 1
eventCount++
} }
watchDuration := time.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
return errors.New("very short watch")
}
glog.Infof("unexpected watch close - %v total items received", eventCount)
return nil
} }

View File

@ -35,6 +35,20 @@ func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) {
return t.WatchFunc(resourceVersion) return t.WatchFunc(resourceVersion)
} }
func TestReflector_watchHandlerError(t *testing.T) {
s := NewStore()
g := NewReflector(&testLW{}, &api.Pod{}, s)
fw := watch.NewFake()
go func() {
fw.Stop()
}()
var resumeRV uint64
err := g.watchHandler(fw, &resumeRV)
if err == nil {
t.Errorf("unexpected non-error")
}
}
func TestReflector_watchHandler(t *testing.T) { func TestReflector_watchHandler(t *testing.T) {
s := NewStore() s := NewStore()
g := NewReflector(&testLW{}, &api.Pod{}, s) g := NewReflector(&testLW{}, &api.Pod{}, s)
@ -49,7 +63,10 @@ func TestReflector_watchHandler(t *testing.T) {
fw.Stop() fw.Stop()
}() }()
var resumeRV uint64 var resumeRV uint64
g.watchHandler(fw, &resumeRV) err := g.watchHandler(fw, &resumeRV)
if err != nil {
t.Errorf("unexpected error %v", err)
}
table := []struct { table := []struct {
ID string ID string

View File

@ -18,6 +18,7 @@ package tools
import ( import (
"sync" "sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -69,7 +70,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface,
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform) w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion) go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil return w, <-w.immediateError
} }
// TransformFunc attempts to convert an object to another object for use with a watcher. // TransformFunc attempts to convert an object to another object for use with a watcher.
@ -88,6 +89,11 @@ type etcdWatcher struct {
etcdStop chan bool etcdStop chan bool
etcdCallEnded chan struct{} etcdCallEnded chan struct{}
// etcdWatch will send an error down this channel if the Watch fails.
// Otherwise, a nil will be sent down this channel watchWaitDuration
// after the watch starts.
immediateError chan error
outgoing chan watch.Event outgoing chan watch.Event
userStop chan struct{} userStop chan struct{}
stopped bool stopped bool
@ -97,20 +103,24 @@ type etcdWatcher struct {
emit func(watch.Event) emit func(watch.Event)
} }
// watchWaitDuration is the amount of time to wait for an error from watch.
const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates. // and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher { func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{ w := &etcdWatcher{
encoding: encoding, encoding: encoding,
versioner: versioner, versioner: versioner,
transform: transform, transform: transform,
list: list, list: list,
filter: filter, filter: filter,
etcdIncoming: make(chan *etcd.Response), etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool), etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}), etcdCallEnded: make(chan struct{}),
outgoing: make(chan watch.Event), immediateError: make(chan error),
userStop: make(chan struct{}), outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
} }
w.emit = func(e watch.Event) { w.outgoing <- e } w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate() go w.translate()
@ -118,10 +128,17 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versio
} }
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine. // as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash() defer util.HandleCrash()
defer close(w.etcdCallEnded) defer close(w.etcdCallEnded)
go func() {
// This is racy; assume that Watch will fail within 100ms if it is going to fail.
// It's still more useful than blocking until the first result shows up.
// Trying to detect the 401: watch window expired error.
<-time.After(watchWaitDuration)
w.immediateError <- nil
}()
if resourceVersion == 0 { if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok { if !ok {
@ -132,6 +149,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser { if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
w.immediateError <- err
} }
} }

View File

@ -198,6 +198,19 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
} }
} }
func TestWatchEtcdError(t *testing.T) {
codec := latest.Codec
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := EtcdHelper{fakeClient, codec, versioner}
_, err := h.Watch("/some/key", 0)
if err == nil {
t.Fatalf("Unexpected non-error")
}
}
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
codec := latest.Codec codec := latest.Codec
fakeClient := NewFakeEtcdClient(t) fakeClient := NewFakeEtcdClient(t)

View File

@ -57,6 +57,8 @@ type FakeEtcdClient struct {
// Write to this to prematurely stop a Watch that is running in a goroutine. // Write to this to prematurely stop a Watch that is running in a goroutine.
WatchInjectError chan<- error WatchInjectError chan<- error
WatchStop chan<- bool WatchStop chan<- bool
// If non-nil, will be returned immediately when Watch is called.
WatchImmediateError error
} }
func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient { func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient {
@ -250,6 +252,9 @@ func (f *FakeEtcdClient) WaitForWatchCompletion() {
} }
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
if f.WatchImmediateError != nil {
return nil, f.WatchImmediateError
}
f.WatchResponse = receiver f.WatchResponse = receiver
f.WatchStop = stop f.WatchStop = stop
f.WatchIndex = waitIndex f.WatchIndex = waitIndex