mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
convert etcd errors to watch errors
This commit is contained in:
parent
05eff2e910
commit
265f889260
@ -20,6 +20,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
@ -48,7 +49,8 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
|
||||
|
||||
// Watch begins watching the specified key. Events are decoded into
|
||||
// API objects and sent down the returned watch.Interface.
|
||||
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
|
||||
// Errors will be sent down the channel.
|
||||
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface {
|
||||
return h.WatchAndTransform(key, resourceVersion, nil)
|
||||
}
|
||||
|
||||
@ -67,10 +69,11 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface,
|
||||
// return value, nil
|
||||
// })
|
||||
//
|
||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
|
||||
// Errors will be sent down the channel.
|
||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
||||
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w, <-w.immediateError
|
||||
return w
|
||||
}
|
||||
|
||||
// TransformFunc attempts to convert an object to another object for use with a watcher.
|
||||
@ -86,14 +89,10 @@ type etcdWatcher struct {
|
||||
filter FilterFunc
|
||||
|
||||
etcdIncoming chan *etcd.Response
|
||||
etcdError chan error
|
||||
etcdStop chan bool
|
||||
etcdCallEnded chan struct{}
|
||||
|
||||
// etcdWatch will send an error down this channel if the Watch fails.
|
||||
// Otherwise, a nil will be sent down this channel watchWaitDuration
|
||||
// after the watch starts.
|
||||
immediateError chan error
|
||||
|
||||
outgoing chan watch.Event
|
||||
userStop chan struct{}
|
||||
stopped bool
|
||||
@ -110,17 +109,16 @@ const watchWaitDuration = 100 * time.Millisecond
|
||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
||||
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
|
||||
w := &etcdWatcher{
|
||||
encoding: encoding,
|
||||
versioner: versioner,
|
||||
transform: transform,
|
||||
list: list,
|
||||
filter: filter,
|
||||
etcdIncoming: make(chan *etcd.Response),
|
||||
etcdStop: make(chan bool),
|
||||
etcdCallEnded: make(chan struct{}),
|
||||
immediateError: make(chan error),
|
||||
outgoing: make(chan watch.Event),
|
||||
userStop: make(chan struct{}),
|
||||
encoding: encoding,
|
||||
versioner: versioner,
|
||||
transform: transform,
|
||||
list: list,
|
||||
filter: filter,
|
||||
etcdIncoming: make(chan *etcd.Response),
|
||||
etcdError: make(chan error, 1),
|
||||
etcdStop: make(chan bool),
|
||||
outgoing: make(chan watch.Event),
|
||||
userStop: make(chan struct{}),
|
||||
}
|
||||
w.emit = func(e watch.Event) { w.outgoing <- e }
|
||||
go w.translate()
|
||||
@ -128,46 +126,36 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versio
|
||||
}
|
||||
|
||||
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
||||
// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will
|
||||
// as a goroutine.
|
||||
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
|
||||
defer util.HandleCrash()
|
||||
defer close(w.etcdCallEnded)
|
||||
go func() {
|
||||
// This is racy; assume that Watch will fail within 100ms if it is going to fail.
|
||||
// It's still more useful than blocking until the first result shows up.
|
||||
// Trying to detect the 401: watch window expired error.
|
||||
<-time.After(watchWaitDuration)
|
||||
w.immediateError <- nil
|
||||
}()
|
||||
defer close(w.etcdError)
|
||||
if resourceVersion == 0 {
|
||||
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
|
||||
if !ok {
|
||||
latest, err := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
|
||||
if err != nil {
|
||||
w.etcdError <- err
|
||||
return
|
||||
}
|
||||
resourceVersion = latest + 1
|
||||
}
|
||||
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
|
||||
if err != etcd.ErrWatchStoppedByUser {
|
||||
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
|
||||
w.immediateError <- err
|
||||
if err != nil && err != etcd.ErrWatchStoppedByUser {
|
||||
w.etcdError <- err
|
||||
}
|
||||
}
|
||||
|
||||
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
||||
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) {
|
||||
success = true
|
||||
|
||||
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
resp, err := client.Get(key, false, recursive)
|
||||
if err != nil {
|
||||
if !IsEtcdNotFound(err) {
|
||||
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
|
||||
success = false
|
||||
return
|
||||
return resourceVersion, err
|
||||
}
|
||||
if index, ok := etcdErrorIndex(err); ok {
|
||||
resourceVersion = index
|
||||
}
|
||||
return
|
||||
return resourceVersion, nil
|
||||
}
|
||||
resourceVersion = resp.EtcdIndex
|
||||
convertRecursiveResponse(resp.Node, resp, incoming)
|
||||
@ -189,7 +177,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
|
||||
incoming <- &copied
|
||||
}
|
||||
|
||||
// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be
|
||||
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
|
||||
// called as a goroutine.
|
||||
func (w *etcdWatcher) translate() {
|
||||
defer close(w.outgoing)
|
||||
@ -197,16 +185,26 @@ func (w *etcdWatcher) translate() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.etcdCallEnded:
|
||||
case err := <-w.etcdError:
|
||||
if err != nil {
|
||||
w.emit(watch.Event{
|
||||
watch.Error,
|
||||
&api.Status{
|
||||
Status: api.StatusFailure,
|
||||
Message: err.Error(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
case <-w.userStop:
|
||||
w.etcdStop <- true
|
||||
return
|
||||
case res, ok := <-w.etcdIncoming:
|
||||
if !ok {
|
||||
return
|
||||
if ok {
|
||||
w.sendResult(res)
|
||||
}
|
||||
w.sendResult(res)
|
||||
// If !ok, don't return here-- must wait for etcdError channel
|
||||
// to give an error or be closed.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -205,10 +205,20 @@ func TestWatchEtcdError(t *testing.T) {
|
||||
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
|
||||
_, err := h.Watch("/some/key", 0)
|
||||
if err == nil {
|
||||
got := <-h.Watch("/some/key", 4).ResultChan()
|
||||
if got.Type != watch.Error {
|
||||
t.Fatalf("Unexpected non-error")
|
||||
}
|
||||
status, ok := got.Object.(*api.Status)
|
||||
if !ok {
|
||||
t.Fatalf("Unexpected non-error object type")
|
||||
}
|
||||
if status.Message != "immediate error" {
|
||||
t.Errorf("Unexpected wrong error")
|
||||
}
|
||||
if status.Status != api.StatusFailure {
|
||||
t.Errorf("Unexpected wrong error status")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
@ -217,10 +227,7 @@ func TestWatch(t *testing.T) {
|
||||
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
|
||||
watching, err := h.Watch("/some/key", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
watching := h.Watch("/some/key", 0)
|
||||
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
// when server returns not found, the watch index starts at the next value (1)
|
||||
@ -249,6 +256,17 @@ func TestWatch(t *testing.T) {
|
||||
// Test error case
|
||||
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
|
||||
|
||||
if errEvent, ok := <-watching.ResultChan(); !ok {
|
||||
t.Errorf("no error result?")
|
||||
} else {
|
||||
if e, a := watch.Error, errEvent.Type; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := "Injected error", errEvent.Object.(*api.Status).Message; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
// Did everything shut down?
|
||||
if _, open := <-fakeClient.WatchResponse; open {
|
||||
t.Errorf("An injected error did not cause a graceful shutdown")
|
||||
@ -349,11 +367,7 @@ func TestWatchEtcdState(t *testing.T) {
|
||||
fakeClient.Data[key] = value
|
||||
}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
watching, err := h.Watch("/somekey/foo", testCase.From)
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", k, err)
|
||||
continue
|
||||
}
|
||||
watching := h.Watch("/somekey/foo", testCase.From)
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
t.Logf("Testing %v", k)
|
||||
@ -421,10 +435,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
||||
fakeClient.Data["/some/key"] = testCase.Response
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
|
||||
watching, err := h.Watch("/some/key", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("%s: unexpected error: %v", k, err)
|
||||
}
|
||||
watching := h.Watch("/some/key", 0)
|
||||
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
|
||||
@ -525,10 +536,7 @@ func TestWatchFromNotFound(t *testing.T) {
|
||||
}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
|
||||
watching, err := h.Watch("/some/key", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
watching := h.Watch("/some/key", 0)
|
||||
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
if fakeClient.WatchIndex != 3 {
|
||||
@ -551,9 +559,14 @@ func TestWatchFromOtherError(t *testing.T) {
|
||||
}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
|
||||
watching, err := h.Watch("/some/key", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
watching := h.Watch("/some/key", 0)
|
||||
|
||||
errEvent := <-watching.ResultChan()
|
||||
if e, a := watch.Error, errEvent.Type; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := "101: () [2]", errEvent.Object.(*api.Status).Message; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
select {
|
||||
@ -576,10 +589,7 @@ func TestWatchPurposefulShutdown(t *testing.T) {
|
||||
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
||||
|
||||
// Test purposeful shutdown
|
||||
watching, err := h.Watch("/some/key", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
watching := h.Watch("/some/key", 0)
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
watching.Stop()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user