mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 04:27:54 +00:00
Merge pull request #3535 from smarterclayton/ignore_the_root_key_on_watch_list
WatchList should not convey events for the root key
This commit is contained in:
commit
fc11801246
@ -61,7 +61,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
|||||||
// watch.Interface. resourceVersion may be used to specify what version to begin
|
// watch.Interface. resourceVersion may be used to specify what version to begin
|
||||||
// watching (e.g., for reconnecting without missing any updates).
|
// watching (e.g., for reconnecting without missing any updates).
|
||||||
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||||
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil)
|
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.ResourceVersioner, nil)
|
||||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
@ -90,7 +90,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface {
|
|||||||
//
|
//
|
||||||
// Errors will be sent down the channel.
|
// Errors will be sent down the channel.
|
||||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
||||||
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
|
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.ResourceVersioner, transform)
|
||||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
@ -98,14 +98,25 @@ func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, trans
|
|||||||
// TransformFunc attempts to convert an object to another object for use with a watcher.
|
// TransformFunc attempts to convert an object to another object for use with a watcher.
|
||||||
type TransformFunc func(runtime.Object) (runtime.Object, error)
|
type TransformFunc func(runtime.Object) (runtime.Object, error)
|
||||||
|
|
||||||
|
// includeFunc returns true if the given key should be considered part of a watch
|
||||||
|
type includeFunc func(key string) bool
|
||||||
|
|
||||||
|
// exceptKey is an includeFunc that returns false when the provided key matches the watched key
|
||||||
|
func exceptKey(except string) includeFunc {
|
||||||
|
return func(key string) bool {
|
||||||
|
return key != except
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||||
type etcdWatcher struct {
|
type etcdWatcher struct {
|
||||||
encoding runtime.Codec
|
encoding runtime.Codec
|
||||||
versioner EtcdResourceVersioner
|
versioner EtcdResourceVersioner
|
||||||
transform TransformFunc
|
transform TransformFunc
|
||||||
|
|
||||||
list bool // If we're doing a recursive watch, should be true.
|
list bool // If we're doing a recursive watch, should be true.
|
||||||
filter FilterFunc
|
include includeFunc
|
||||||
|
filter FilterFunc
|
||||||
|
|
||||||
etcdIncoming chan *etcd.Response
|
etcdIncoming chan *etcd.Response
|
||||||
etcdError chan error
|
etcdError chan error
|
||||||
@ -126,12 +137,13 @@ const watchWaitDuration = 100 * time.Millisecond
|
|||||||
|
|
||||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
||||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
||||||
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
|
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
|
||||||
w := &etcdWatcher{
|
w := &etcdWatcher{
|
||||||
encoding: encoding,
|
encoding: encoding,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
transform: transform,
|
transform: transform,
|
||||||
list: list,
|
list: list,
|
||||||
|
include: include,
|
||||||
filter: filter,
|
filter: filter,
|
||||||
etcdIncoming: make(chan *etcd.Response),
|
etcdIncoming: make(chan *etcd.Response),
|
||||||
etcdError: make(chan error, 1),
|
etcdError: make(chan error, 1),
|
||||||
@ -258,6 +270,9 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
|
|||||||
glog.Errorf("unexpected nil node: %#v", res)
|
glog.Errorf("unexpected nil node: %#v", res)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if w.include != nil && !w.include(res.Node.Key) {
|
||||||
|
return
|
||||||
|
}
|
||||||
data := []byte(res.Node.Value)
|
data := []byte(res.Node.Value)
|
||||||
obj, err := w.decodeObject(data, res.Node.ModifiedIndex)
|
obj, err := w.decodeObject(data, res.Node.ModifiedIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -285,6 +300,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
|||||||
glog.Errorf("unexpected nil node: %#v", res)
|
glog.Errorf("unexpected nil node: %#v", res)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if w.include != nil && !w.include(res.Node.Key) {
|
||||||
|
return
|
||||||
|
}
|
||||||
curData := []byte(res.Node.Value)
|
curData := []byte(res.Node.Value)
|
||||||
curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex)
|
curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -331,6 +349,9 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
|||||||
glog.Errorf("unexpected nil prev node: %#v", res)
|
glog.Errorf("unexpected nil prev node: %#v", res)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if w.include != nil && !w.include(res.PrevNode.Key) {
|
||||||
|
return
|
||||||
|
}
|
||||||
data := []byte(res.PrevNode.Value)
|
data := []byte(res.PrevNode.Value)
|
||||||
index := res.PrevNode.ModifiedIndex
|
index := res.PrevNode.ModifiedIndex
|
||||||
if res.Node != nil {
|
if res.Node != nil {
|
||||||
|
@ -113,7 +113,7 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
|
|
||||||
for name, item := range table {
|
for name, item := range table {
|
||||||
for _, action := range item.actions {
|
for _, action := range item.actions {
|
||||||
w := newEtcdWatcher(true, firstLetterIsB, codec, versioner, nil)
|
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil)
|
||||||
emitCalled := false
|
emitCalled := false
|
||||||
w.emit = func(event watch.Event) {
|
w.emit = func(event watch.Event) {
|
||||||
emitCalled = true
|
emitCalled = true
|
||||||
@ -151,7 +151,7 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
||||||
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
|
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -165,7 +165,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
|||||||
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
||||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
|
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -179,7 +179,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
|||||||
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
||||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
|
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -524,6 +524,51 @@ func TestWatchListFromZeroIndex(t *testing.T) {
|
|||||||
watching.Stop()
|
watching.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchListIgnoresRootKey(t *testing.T) {
|
||||||
|
codec := latest.Codec
|
||||||
|
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||||
|
|
||||||
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
|
watching, err := h.WatchList("/some/key", 1, Everything)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
|
||||||
|
// This is the root directory of the watch, which happens to have a value encoded
|
||||||
|
fakeClient.WatchResponse <- &etcd.Response{
|
||||||
|
Action: "delete",
|
||||||
|
PrevNode: &etcd.Node{
|
||||||
|
Key: "/some/key",
|
||||||
|
Value: runtime.EncodeOrDie(codec, pod),
|
||||||
|
CreatedIndex: 1,
|
||||||
|
ModifiedIndex: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// Delete of the parent directory of a key is an event that a list watch would receive,
|
||||||
|
// but will have no value so the decode will fail.
|
||||||
|
fakeClient.WatchResponse <- &etcd.Response{
|
||||||
|
Action: "delete",
|
||||||
|
PrevNode: &etcd.Node{
|
||||||
|
Key: "/some/key",
|
||||||
|
Value: "",
|
||||||
|
CreatedIndex: 1,
|
||||||
|
ModifiedIndex: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
close(fakeClient.WatchStop)
|
||||||
|
|
||||||
|
// the existing node is detected and the index set
|
||||||
|
_, open := <-watching.ResultChan()
|
||||||
|
if open {
|
||||||
|
t.Fatalf("unexpected channel open")
|
||||||
|
}
|
||||||
|
|
||||||
|
watching.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatchFromNotFound(t *testing.T) {
|
func TestWatchFromNotFound(t *testing.T) {
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||||
|
Loading…
Reference in New Issue
Block a user