mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 05:21:58 +00:00
Merge pull request #1404 from lavalamp/scheduler
Add error type to watch.
This commit is contained in:
commit
b0b7f13ade
@ -21,6 +21,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// statusError is an error intended for consumption by a REST API server.
|
// statusError is an error intended for consumption by a REST API server.
|
||||||
@ -38,6 +39,16 @@ func (e *statusError) Status() api.Status {
|
|||||||
return e.status
|
return e.status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FromObject generates an statusError from an api.Status, if that is the type of obj; otherwise,
|
||||||
|
// returns an error created by fmt.Errorf.
|
||||||
|
func FromObject(obj runtime.Object) error {
|
||||||
|
switch t := obj.(type) {
|
||||||
|
case *api.Status:
|
||||||
|
return &statusError{*t}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unexpected object: %v", obj)
|
||||||
|
}
|
||||||
|
|
||||||
// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
|
// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
|
||||||
func NewNotFound(kind, name string) error {
|
func NewNotFound(kind, name string) error {
|
||||||
return &statusError{api.Status{
|
return &statusError{api.Status{
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestErrorNew(t *testing.T) {
|
func TestErrorNew(t *testing.T) {
|
||||||
@ -131,3 +132,23 @@ func Test_reasonForError(t *testing.T) {
|
|||||||
t.Errorf("unexpected reason type: %#v", a)
|
t.Errorf("unexpected reason type: %#v", a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TestType struct{}
|
||||||
|
|
||||||
|
func (*TestType) IsAnAPIObject() {}
|
||||||
|
|
||||||
|
func TestFromObject(t *testing.T) {
|
||||||
|
table := []struct {
|
||||||
|
obj runtime.Object
|
||||||
|
message string
|
||||||
|
}{
|
||||||
|
{&api.Status{Message: "foobar"}, "foobar"},
|
||||||
|
{&TestType{}, "unexpected object: &{}"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
if e, a := item.message, FromObject(item.obj).Error(); e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
8
pkg/client/cache/reflector.go
vendored
8
pkg/client/cache/reflector.go
vendored
@ -22,6 +22,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
@ -101,7 +102,7 @@ func (r *Reflector) listAndWatch() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := r.watchHandler(w, &resourceVersion); err != nil {
|
if err := r.watchHandler(w, &resourceVersion); err != nil {
|
||||||
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
|
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,6 +132,9 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err
|
|||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
if event.Type == watch.Error {
|
||||||
|
return apierrs.FromObject(event.Object)
|
||||||
|
}
|
||||||
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
||||||
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
||||||
continue
|
continue
|
||||||
@ -162,6 +166,6 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err
|
|||||||
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
|
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
|
||||||
return errors.New("very short watch")
|
return errors.New("very short watch")
|
||||||
}
|
}
|
||||||
glog.Infof("unexpected watch close - %v total items received", eventCount)
|
glog.Infof("watch close - %v total items received", eventCount)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -60,18 +60,18 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceEtcd) run() {
|
func (s *SourceEtcd) run() {
|
||||||
watching, err := s.helper.Watch(s.key, 0)
|
watching := s.helper.Watch(s.key, 0)
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to initialize etcd watch: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event, ok := <-watching.ResultChan():
|
case event, ok := <-watching.ResultChan():
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if event.Type == watch.Error {
|
||||||
|
glog.Errorf("Watch error: %v", event.Object)
|
||||||
|
watching.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
pods, err := eventToPods(event)
|
pods, err := eventToPods(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to parse result from etcd watch: %v", err)
|
glog.Errorf("Failed to parse result from etcd watch: %v", err)
|
||||||
|
@ -343,7 +343,7 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u
|
|||||||
return nil, fmt.Errorf("label selectors are not supported on services")
|
return nil, fmt.Errorf("label selectors are not supported on services")
|
||||||
}
|
}
|
||||||
if value, found := field.RequiresExactMatch("ID"); found {
|
if value, found := field.RequiresExactMatch("ID"); found {
|
||||||
return r.Watch(makeServiceKey(value), resourceVersion)
|
return r.Watch(makeServiceKey(value), resourceVersion), nil
|
||||||
}
|
}
|
||||||
if field.Empty() {
|
if field.Empty() {
|
||||||
return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything)
|
return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything)
|
||||||
@ -375,7 +375,7 @@ func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion
|
|||||||
return nil, fmt.Errorf("label selectors are not supported on endpoints")
|
return nil, fmt.Errorf("label selectors are not supported on endpoints")
|
||||||
}
|
}
|
||||||
if value, found := field.RequiresExactMatch("ID"); found {
|
if value, found := field.RequiresExactMatch("ID"); found {
|
||||||
return r.Watch(makeServiceEndpointsKey(value), resourceVersion)
|
return r.Watch(makeServiceEndpointsKey(value), resourceVersion), nil
|
||||||
}
|
}
|
||||||
if field.Empty() {
|
if field.Empty() {
|
||||||
return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything)
|
return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything)
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
@ -48,7 +49,8 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
|
|||||||
|
|
||||||
// Watch begins watching the specified key. Events are decoded into
|
// Watch begins watching the specified key. Events are decoded into
|
||||||
// API objects and sent down the returned watch.Interface.
|
// API objects and sent down the returned watch.Interface.
|
||||||
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
|
// Errors will be sent down the channel.
|
||||||
|
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface {
|
||||||
return h.WatchAndTransform(key, resourceVersion, nil)
|
return h.WatchAndTransform(key, resourceVersion, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,10 +69,11 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface,
|
|||||||
// return value, nil
|
// return value, nil
|
||||||
// })
|
// })
|
||||||
//
|
//
|
||||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
|
// Errors will be sent down the channel.
|
||||||
|
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
||||||
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
|
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
|
||||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||||
return w, <-w.immediateError
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
// TransformFunc attempts to convert an object to another object for use with a watcher.
|
// TransformFunc attempts to convert an object to another object for use with a watcher.
|
||||||
@ -86,14 +89,10 @@ type etcdWatcher struct {
|
|||||||
filter FilterFunc
|
filter FilterFunc
|
||||||
|
|
||||||
etcdIncoming chan *etcd.Response
|
etcdIncoming chan *etcd.Response
|
||||||
|
etcdError chan error
|
||||||
etcdStop chan bool
|
etcdStop chan bool
|
||||||
etcdCallEnded chan struct{}
|
etcdCallEnded chan struct{}
|
||||||
|
|
||||||
// etcdWatch will send an error down this channel if the Watch fails.
|
|
||||||
// Otherwise, a nil will be sent down this channel watchWaitDuration
|
|
||||||
// after the watch starts.
|
|
||||||
immediateError chan error
|
|
||||||
|
|
||||||
outgoing chan watch.Event
|
outgoing chan watch.Event
|
||||||
userStop chan struct{}
|
userStop chan struct{}
|
||||||
stopped bool
|
stopped bool
|
||||||
@ -110,17 +109,16 @@ const watchWaitDuration = 100 * time.Millisecond
|
|||||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
||||||
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
|
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
|
||||||
w := &etcdWatcher{
|
w := &etcdWatcher{
|
||||||
encoding: encoding,
|
encoding: encoding,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
transform: transform,
|
transform: transform,
|
||||||
list: list,
|
list: list,
|
||||||
filter: filter,
|
filter: filter,
|
||||||
etcdIncoming: make(chan *etcd.Response),
|
etcdIncoming: make(chan *etcd.Response),
|
||||||
etcdStop: make(chan bool),
|
etcdError: make(chan error, 1),
|
||||||
etcdCallEnded: make(chan struct{}),
|
etcdStop: make(chan bool),
|
||||||
immediateError: make(chan error),
|
outgoing: make(chan watch.Event),
|
||||||
outgoing: make(chan watch.Event),
|
userStop: make(chan struct{}),
|
||||||
userStop: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
w.emit = func(e watch.Event) { w.outgoing <- e }
|
w.emit = func(e watch.Event) { w.outgoing <- e }
|
||||||
go w.translate()
|
go w.translate()
|
||||||
@ -128,46 +126,36 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versio
|
|||||||
}
|
}
|
||||||
|
|
||||||
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
||||||
// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will
|
// as a goroutine.
|
||||||
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
|
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
defer close(w.etcdCallEnded)
|
defer close(w.etcdError)
|
||||||
go func() {
|
|
||||||
// This is racy; assume that Watch will fail within 100ms if it is going to fail.
|
|
||||||
// It's still more useful than blocking until the first result shows up.
|
|
||||||
// Trying to detect the 401: watch window expired error.
|
|
||||||
<-time.After(watchWaitDuration)
|
|
||||||
w.immediateError <- nil
|
|
||||||
}()
|
|
||||||
if resourceVersion == 0 {
|
if resourceVersion == 0 {
|
||||||
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
|
latest, err := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
|
||||||
if !ok {
|
if err != nil {
|
||||||
|
w.etcdError <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resourceVersion = latest + 1
|
resourceVersion = latest + 1
|
||||||
}
|
}
|
||||||
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
|
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
|
||||||
if err != etcd.ErrWatchStoppedByUser {
|
if err != nil && err != etcd.ErrWatchStoppedByUser {
|
||||||
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
|
w.etcdError <- err
|
||||||
w.immediateError <- err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
||||||
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) {
|
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||||
success = true
|
|
||||||
|
|
||||||
resp, err := client.Get(key, false, recursive)
|
resp, err := client.Get(key, false, recursive)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !IsEtcdNotFound(err) {
|
if !IsEtcdNotFound(err) {
|
||||||
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
|
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
|
||||||
success = false
|
return resourceVersion, err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if index, ok := etcdErrorIndex(err); ok {
|
if index, ok := etcdErrorIndex(err); ok {
|
||||||
resourceVersion = index
|
resourceVersion = index
|
||||||
}
|
}
|
||||||
return
|
return resourceVersion, nil
|
||||||
}
|
}
|
||||||
resourceVersion = resp.EtcdIndex
|
resourceVersion = resp.EtcdIndex
|
||||||
convertRecursiveResponse(resp.Node, resp, incoming)
|
convertRecursiveResponse(resp.Node, resp, incoming)
|
||||||
@ -189,7 +177,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
|
|||||||
incoming <- &copied
|
incoming <- &copied
|
||||||
}
|
}
|
||||||
|
|
||||||
// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be
|
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
|
||||||
// called as a goroutine.
|
// called as a goroutine.
|
||||||
func (w *etcdWatcher) translate() {
|
func (w *etcdWatcher) translate() {
|
||||||
defer close(w.outgoing)
|
defer close(w.outgoing)
|
||||||
@ -197,16 +185,26 @@ func (w *etcdWatcher) translate() {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-w.etcdCallEnded:
|
case err := <-w.etcdError:
|
||||||
|
if err != nil {
|
||||||
|
w.emit(watch.Event{
|
||||||
|
watch.Error,
|
||||||
|
&api.Status{
|
||||||
|
Status: api.StatusFailure,
|
||||||
|
Message: err.Error(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
return
|
return
|
||||||
case <-w.userStop:
|
case <-w.userStop:
|
||||||
w.etcdStop <- true
|
w.etcdStop <- true
|
||||||
return
|
return
|
||||||
case res, ok := <-w.etcdIncoming:
|
case res, ok := <-w.etcdIncoming:
|
||||||
if !ok {
|
if ok {
|
||||||
return
|
w.sendResult(res)
|
||||||
}
|
}
|
||||||
w.sendResult(res)
|
// If !ok, don't return here-- must wait for etcdError channel
|
||||||
|
// to give an error or be closed.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -205,10 +205,20 @@ func TestWatchEtcdError(t *testing.T) {
|
|||||||
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
|
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
|
||||||
h := EtcdHelper{fakeClient, codec, versioner}
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
_, err := h.Watch("/some/key", 0)
|
got := <-h.Watch("/some/key", 4).ResultChan()
|
||||||
if err == nil {
|
if got.Type != watch.Error {
|
||||||
t.Fatalf("Unexpected non-error")
|
t.Fatalf("Unexpected non-error")
|
||||||
}
|
}
|
||||||
|
status, ok := got.Object.(*api.Status)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Unexpected non-error object type")
|
||||||
|
}
|
||||||
|
if status.Message != "immediate error" {
|
||||||
|
t.Errorf("Unexpected wrong error")
|
||||||
|
}
|
||||||
|
if status.Status != api.StatusFailure {
|
||||||
|
t.Errorf("Unexpected wrong error status")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
@ -217,10 +227,7 @@ func TestWatch(t *testing.T) {
|
|||||||
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
||||||
h := EtcdHelper{fakeClient, codec, versioner}
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
watching, err := h.Watch("/some/key", 0)
|
watching := h.Watch("/some/key", 0)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
fakeClient.WaitForWatchCompletion()
|
||||||
// when server returns not found, the watch index starts at the next value (1)
|
// when server returns not found, the watch index starts at the next value (1)
|
||||||
@ -249,6 +256,17 @@ func TestWatch(t *testing.T) {
|
|||||||
// Test error case
|
// Test error case
|
||||||
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
|
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
|
||||||
|
|
||||||
|
if errEvent, ok := <-watching.ResultChan(); !ok {
|
||||||
|
t.Errorf("no error result?")
|
||||||
|
} else {
|
||||||
|
if e, a := watch.Error, errEvent.Type; e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
if e, a := "Injected error", errEvent.Object.(*api.Status).Message; e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Did everything shut down?
|
// Did everything shut down?
|
||||||
if _, open := <-fakeClient.WatchResponse; open {
|
if _, open := <-fakeClient.WatchResponse; open {
|
||||||
t.Errorf("An injected error did not cause a graceful shutdown")
|
t.Errorf("An injected error did not cause a graceful shutdown")
|
||||||
@ -349,11 +367,7 @@ func TestWatchEtcdState(t *testing.T) {
|
|||||||
fakeClient.Data[key] = value
|
fakeClient.Data[key] = value
|
||||||
}
|
}
|
||||||
h := EtcdHelper{fakeClient, codec, versioner}
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
watching, err := h.Watch("/somekey/foo", testCase.From)
|
watching := h.Watch("/somekey/foo", testCase.From)
|
||||||
if err != nil {
|
|
||||||
t.Errorf("%s: unexpected error: %v", k, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
|
||||||
t.Logf("Testing %v", k)
|
t.Logf("Testing %v", k)
|
||||||
@ -421,10 +435,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
|||||||
fakeClient.Data["/some/key"] = testCase.Response
|
fakeClient.Data["/some/key"] = testCase.Response
|
||||||
h := EtcdHelper{fakeClient, codec, versioner}
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
watching, err := h.Watch("/some/key", 0)
|
watching := h.Watch("/some/key", 0)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%s: unexpected error: %v", k, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
fakeClient.WaitForWatchCompletion()
|
||||||
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
|
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
|
||||||
@ -525,10 +536,7 @@ func TestWatchFromNotFound(t *testing.T) {
|
|||||||
}
|
}
|
||||||
h := EtcdHelper{fakeClient, codec, versioner}
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
watching, err := h.Watch("/some/key", 0)
|
watching := h.Watch("/some/key", 0)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
fakeClient.WaitForWatchCompletion()
|
||||||
if fakeClient.WatchIndex != 3 {
|
if fakeClient.WatchIndex != 3 {
|
||||||
@ -551,9 +559,14 @@ func TestWatchFromOtherError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
h := EtcdHelper{fakeClient, codec, versioner}
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
watching, err := h.Watch("/some/key", 0)
|
watching := h.Watch("/some/key", 0)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
errEvent := <-watching.ResultChan()
|
||||||
|
if e, a := watch.Error, errEvent.Type; e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
if e, a := "101: () [2]", errEvent.Object.(*api.Status).Message; e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -576,10 +589,7 @@ func TestWatchPurposefulShutdown(t *testing.T) {
|
|||||||
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
||||||
|
|
||||||
// Test purposeful shutdown
|
// Test purposeful shutdown
|
||||||
watching, err := h.Watch("/some/key", 0)
|
watching := h.Watch("/some/key", 0)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
fakeClient.WaitForWatchCompletion()
|
||||||
watching.Stop()
|
watching.Stop()
|
||||||
|
|
||||||
|
@ -33,6 +33,7 @@ type watchEvent struct {
|
|||||||
|
|
||||||
// For added or modified objects, this is the new object; for deleted objects,
|
// For added or modified objects, this is the new object; for deleted objects,
|
||||||
// it's the state of the object immediately prior to its deletion.
|
// it's the state of the object immediately prior to its deletion.
|
||||||
|
// For errors, it's an api.Status.
|
||||||
Object runtime.RawExtension `json:"object,omitempty" yaml:"object,omitempty"`
|
Object runtime.RawExtension `json:"object,omitempty" yaml:"object,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,14 +41,18 @@ const (
|
|||||||
Added EventType = "ADDED"
|
Added EventType = "ADDED"
|
||||||
Modified EventType = "MODIFIED"
|
Modified EventType = "MODIFIED"
|
||||||
Deleted EventType = "DELETED"
|
Deleted EventType = "DELETED"
|
||||||
|
Error EventType = "ERROR"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Event represents a single event to a watched resource.
|
// Event represents a single event to a watched resource.
|
||||||
type Event struct {
|
type Event struct {
|
||||||
Type EventType
|
Type EventType
|
||||||
|
|
||||||
// If Type == Deleted, then this is the state of the object
|
// Object is:
|
||||||
// immediately before deletion.
|
// * If Type is Added or Modified: the new state of the object.
|
||||||
|
// * If Type is Deleted: the state of the object immediately before deletion.
|
||||||
|
// * If Type is Error: *api.Status is recommended; other types may make sense
|
||||||
|
// depending on context.
|
||||||
Object runtime.Object
|
Object runtime.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,6 +98,11 @@ func (f *FakeWatcher) Delete(lastValue runtime.Object) {
|
|||||||
f.result <- Event{Deleted, lastValue}
|
f.result <- Event{Deleted, lastValue}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Error sends an Error event.
|
||||||
|
func (f *FakeWatcher) Error(errValue runtime.Object) {
|
||||||
|
f.result <- Event{Error, errValue}
|
||||||
|
}
|
||||||
|
|
||||||
// Action sends an event of the requested type, for table-based testing.
|
// Action sends an event of the requested type, for table-based testing.
|
||||||
func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
|
func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
|
||||||
f.result <- Event{action, obj}
|
f.result <- Event{action, obj}
|
||||||
|
@ -35,6 +35,7 @@ func TestFake(t *testing.T) {
|
|||||||
{Modified, testType("qux")},
|
{Modified, testType("qux")},
|
||||||
{Modified, testType("bar")},
|
{Modified, testType("bar")},
|
||||||
{Deleted, testType("bar")},
|
{Deleted, testType("bar")},
|
||||||
|
{Error, testType("error: blah")},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prove that f implements Interface by phrasing this as a function.
|
// Prove that f implements Interface by phrasing this as a function.
|
||||||
@ -62,6 +63,7 @@ func TestFake(t *testing.T) {
|
|||||||
f.Action(Modified, testType("qux"))
|
f.Action(Modified, testType("qux"))
|
||||||
f.Modify(testType("bar"))
|
f.Modify(testType("bar"))
|
||||||
f.Delete(testType("bar"))
|
f.Delete(testType("bar"))
|
||||||
|
f.Error(testType("error: blah"))
|
||||||
f.Stop()
|
f.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,10 +101,7 @@ func TestWatch(t *testing.T) {
|
|||||||
expectedVersion := resp.Node.ModifiedIndex
|
expectedVersion := resp.Node.ModifiedIndex
|
||||||
|
|
||||||
// watch should load the object at the current index
|
// watch should load the object at the current index
|
||||||
w, err := helper.Watch(key, 0)
|
w := helper.Watch(key, 0)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
event := <-w.ResultChan()
|
event := <-w.ResultChan()
|
||||||
if event.Type != watch.Added || event.Object == nil {
|
if event.Type != watch.Added || event.Object == nil {
|
||||||
t.Fatalf("expected first value to be set to ADDED, got %#v", event)
|
t.Fatalf("expected first value to be set to ADDED, got %#v", event)
|
||||||
|
Loading…
Reference in New Issue
Block a user