From 72b35816cde6df548c04a737bb893dbcc79f2c8a Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 21 Aug 2014 18:16:46 -0700 Subject: [PATCH] Need to remove pods that change labels. --- pkg/registry/etcd/etcd.go | 11 +- pkg/registry/pod/registry.go | 2 +- pkg/registry/pod/storage.go | 12 +- pkg/registry/registrytest/pod.go | 3 +- pkg/tools/etcd_tools.go | 158 ++++++++++++++------ pkg/tools/etcd_tools_test.go | 247 +++++++++++++++++++------------ 6 files changed, 276 insertions(+), 157 deletions(-) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 371454a9ba4..41407b65c3b 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -79,8 +79,15 @@ func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { } // WatchPods begins watching for new, changed, or deleted pods. -func (r *Registry) WatchPods(resourceVersion uint64) (watch.Interface, error) { - return r.WatchList("/registry/pods", resourceVersion, tools.Everything) +func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { + return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool { + pod, ok := obj.(*api.Pod) + if !ok { + glog.Errorf("Unexpected object during pod watch: %#v", obj) + return false + } + return filter(pod) + }) } // GetPod gets a specific pod specified by its ID. diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index d65f82a080b..2aaf488deab 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -27,7 +27,7 @@ type Registry interface { // ListPods obtains a list of pods that match selector. ListPods(selector labels.Selector) ([]api.Pod, error) // Watch for new/changed/deleted pods - WatchPods(resourceVersion uint64) (watch.Interface, error) + WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) // Get a specific pod GetPod(podID string) (*api.Pod, error) // Create a pod based on a specification. diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 2f9410c8d8c..eec4e143c2d 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -118,20 +118,14 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { // Watch begins watching for new, changed, or deleted pods. func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - source, err := rs.registry.WatchPods(resourceVersion) - if err != nil { - return nil, err - } - return watch.Filter(source, func(e watch.Event) (watch.Event, bool) { - pod := e.Object.(*api.Pod) + return rs.registry.WatchPods(resourceVersion, func(pod *api.Pod) bool { fields := labels.Set{ "ID": pod.ID, "DesiredState.Status": string(pod.DesiredState.Status), "DesiredState.Host": pod.DesiredState.Host, } - passesFilter := label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) - return e, passesFilter - }), nil + return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + }) } func (rs RegistryStorage) New() interface{} { diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 6efaaf7fa87..d097e7dafad 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -55,7 +55,8 @@ func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { return filtered, nil } -func (r *PodRegistry) WatchPods(resourceVersion uint64) (watch.Interface, error) { +func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { + // TODO: wire filter down into the mux; it needs access to current and previous state :( return r.mux.Watch(), nil } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 358d69ebefb..108884f5a68 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -343,7 +343,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, // }) // func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { - w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform) + w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -442,11 +442,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming return } copied := *response - if node.ModifiedIndex == node.CreatedIndex { - copied.Action = "create" - } else { - copied.Action = "set" - } + copied.Action = "get" copied.Node = node incoming <- &copied } @@ -473,46 +469,10 @@ func (w *etcdWatcher) translate() { } } -func (w *etcdWatcher) sendResult(res *etcd.Response) { - var action watch.EventType - var data []byte - var index uint64 - switch res.Action { - case "create": - if res.Node == nil { - glog.Errorf("unexpected nil node: %#v", res) - return - } - data = []byte(res.Node.Value) - index = res.Node.ModifiedIndex - action = watch.Added - case "set", "compareAndSwap", "get": - if res.Node == nil { - glog.Errorf("unexpected nil node: %#v", res) - return - } - data = []byte(res.Node.Value) - index = res.Node.ModifiedIndex - action = watch.Modified - case "delete": - if res.PrevNode == nil { - glog.Errorf("unexpected nil prev node: %#v", res) - return - } - data = []byte(res.PrevNode.Value) - index = res.Node.ModifiedIndex - action = watch.Deleted - default: - glog.Errorf("unknown action: %v", res.Action) - return - } - +func (w *etcdWatcher) decodeObject(data []byte, index uint64) (interface{}, error) { obj, err := w.encoding.Decode(data) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node) - // TODO: expose an error through watch.Interface? - w.Stop() - return + return nil, err } // ensure resource version is set on the object we load from etcd @@ -527,18 +487,122 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { obj, err = w.transform(obj) if err != nil { glog.Errorf("failure to transform api object %#v: %v", obj, err) - // TODO: expose an error through watch.Interface? - w.Stop() - return + return nil, err } } + return obj, nil +} + +func (w *etcdWatcher) sendCreate(res *etcd.Response) { + if res.Node == nil { + glog.Errorf("unexpected nil node: %#v", res) + return + } + data := []byte(res.Node.Value) + obj, err := w.decodeObject(data, res.Node.ModifiedIndex) + if err != nil { + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node) + // TODO: expose an error through watch.Interface? + // Ignore this value. If we stop the watch on a bad value, a client that uses + // the resourceVersion to resume will never be able to get past a bad value. + return + } + if !w.filter(obj) { + return + } + action := watch.Added + if res.Node.ModifiedIndex != res.Node.CreatedIndex { + action = watch.Modified + } w.emit(watch.Event{ Type: action, Object: obj, }) } +func (w *etcdWatcher) sendModify(res *etcd.Response) { + if res.Node == nil { + glog.Errorf("unexpected nil node: %#v", res) + return + } + curData := []byte(res.Node.Value) + curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex) + if err != nil { + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(curData), res, res.Node) + // TODO: expose an error through watch.Interface? + // Ignore this value. If we stop the watch on a bad value, a client that uses + // the resourceVersion to resume will never be able to get past a bad value. + return + } + curObjPasses := w.filter(curObj) + oldObjPasses := false + var oldObj interface{} + if res.PrevNode != nil && res.PrevNode.Value != "" { + // Ignore problems reading the old object. + if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil { + oldObjPasses = w.filter(oldObj) + } + } + // Some changes to an object may cause it to start or stop matching a filter. + // We need to report those as adds/deletes. So we have to check both the previous + // and current value of the object. + switch { + case curObjPasses && oldObjPasses: + w.emit(watch.Event{ + Type: watch.Modified, + Object: curObj, + }) + case curObjPasses && !oldObjPasses: + w.emit(watch.Event{ + Type: watch.Added, + Object: curObj, + }) + case !curObjPasses && oldObjPasses: + w.emit(watch.Event{ + Type: watch.Deleted, + Object: oldObj, + }) + } + // Do nothing if neither new nor old object passed the filter. +} + +func (w *etcdWatcher) sendDelete(res *etcd.Response) { + if res.PrevNode == nil { + glog.Errorf("unexpected nil prev node: %#v", res) + return + } + data := []byte(res.PrevNode.Value) + obj, err := w.decodeObject(data, res.PrevNode.ModifiedIndex) + if err != nil { + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode) + // TODO: expose an error through watch.Interface? + // Ignore this value. If we stop the watch on a bad value, a client that uses + // the resourceVersion to resume will never be able to get past a bad value. + return + } + if !w.filter(obj) { + return + } + w.emit(watch.Event{ + Type: watch.Deleted, + Object: obj, + }) +} + +func (w *etcdWatcher) sendResult(res *etcd.Response) { + switch res.Action { + case "create", "get": + w.sendCreate(res) + case "set", "compareAndSwap": + w.sendModify(res) + case "delete": + w.sendDelete(res) + default: + glog.Errorf("unknown action: %v", res.Action) + } +} + // ResultChannel implements watch.Interface. func (w *etcdWatcher) ResultChan() <-chan watch.Event { return w.outgoing diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 2b36bc8e52d..f11b7ea9378 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -365,88 +365,127 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { } } -func TestWatchInterpretation_ListCreate(t *testing.T) { - w := newEtcdWatcher(true, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := codec.Encode(pod) +func TestWatchInterpretations(t *testing.T) { + // Declare some pods to make the test cases compact. + podFoo := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBar := &api.Pod{JSONBase: api.JSONBase{ID: "bar"}} + podBaz := &api.Pod{JSONBase: api.JSONBase{ID: "baz"}} + firstLetterIsB := func(obj interface{}) bool { + return obj.(*api.Pod).ID[0] == 'b' + } - go w.sendResult(&etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(podBytes), + // All of these test cases will be run with the firstLetterIsB FilterFunc. + table := map[string]struct { + actions []string // Run this test item for every action here. + prevNodeValue string + nodeValue string + expectEmit bool + expectType watch.EventType + expectObject interface{} + }{ + "create": { + actions: []string{"create", "get"}, + nodeValue: api.EncodeOrDie(podBar), + expectEmit: true, + expectType: watch.Added, + expectObject: podBar, }, - }) - - got := <-w.outgoing - if e, a := watch.Added, got.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } -} - -func TestWatchInterpretation_ListAdd(t *testing.T) { - w := newEtcdWatcher(true, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := codec.Encode(pod) - - go w.sendResult(&etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(podBytes), + "create but filter blocks": { + actions: []string{"create", "get"}, + nodeValue: api.EncodeOrDie(podFoo), + expectEmit: false, }, - }) - - got := <-w.outgoing - if e, a := watch.Modified, got.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } -} - -func TestWatchInterpretation_Delete(t *testing.T) { - w := newEtcdWatcher(true, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := codec.Encode(pod) - - go w.sendResult(&etcd.Response{ - Action: "delete", - Node: &etcd.Node{ - ModifiedIndex: 2, + "delete": { + actions: []string{"delete"}, + prevNodeValue: api.EncodeOrDie(podBar), + expectEmit: true, + expectType: watch.Deleted, + expectObject: podBar, }, - PrevNode: &etcd.Node{ - Value: string(podBytes), - ModifiedIndex: 1, + "delete but filter blocks": { + actions: []string{"delete"}, + nodeValue: api.EncodeOrDie(podFoo), + expectEmit: false, + }, + "modify appears to create 1": { + actions: []string{"set", "compareAndSwap"}, + nodeValue: api.EncodeOrDie(podBar), + expectEmit: true, + expectType: watch.Added, + expectObject: podBar, + }, + "modify appears to create 2": { + actions: []string{"set", "compareAndSwap"}, + prevNodeValue: api.EncodeOrDie(podFoo), + nodeValue: api.EncodeOrDie(podBar), + expectEmit: true, + expectType: watch.Added, + expectObject: podBar, + }, + "modify appears to delete": { + actions: []string{"set", "compareAndSwap"}, + prevNodeValue: api.EncodeOrDie(podBar), + nodeValue: api.EncodeOrDie(podFoo), + expectEmit: true, + expectType: watch.Deleted, + expectObject: podBar, // Should return last state that passed the filter! + }, + "modify modifies": { + actions: []string{"set", "compareAndSwap"}, + prevNodeValue: api.EncodeOrDie(podBar), + nodeValue: api.EncodeOrDie(podBaz), + expectEmit: true, + expectType: watch.Modified, + expectObject: podBaz, + }, + "modify ignores": { + actions: []string{"set", "compareAndSwap"}, + nodeValue: api.EncodeOrDie(podFoo), + expectEmit: false, }, - }) - - got := <-w.outgoing - if e, a := watch.Deleted, got.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) } - pod.ResourceVersion = 2 - if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) + + for name, item := range table { + for _, action := range item.actions { + w := newEtcdWatcher(true, firstLetterIsB, codec, versioner, nil) + emitCalled := false + w.emit = func(event watch.Event) { + emitCalled = true + if !item.expectEmit { + return + } + if e, a := item.expectType, event.Type; e != a { + t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) + } + if e, a := item.expectObject, event.Object; !reflect.DeepEqual(e, a) { + t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) + } + } + + var n, pn *etcd.Node + if item.nodeValue != "" { + n = &etcd.Node{Value: item.nodeValue} + } + if item.prevNodeValue != "" { + pn = &etcd.Node{Value: item.prevNodeValue} + } + + w.sendResult(&etcd.Response{ + Action: action, + Node: n, + PrevNode: pn, + }) + + if e, a := item.expectEmit, emitCalled; e != a { + t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) + } + w.Stop() + } } } func TestWatchInterpretation_ResponseNotSet(t *testing.T) { - w := newEtcdWatcher(false, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) + w := newEtcdWatcher(false, Everything, codec, versioner, nil) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -454,35 +493,44 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) { w.sendResult(&etcd.Response{ Action: "update", }) + w.Stop() } func TestWatchInterpretation_ResponseNoNode(t *testing.T) { - w := newEtcdWatcher(false, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - w.emit = func(e watch.Event) { - t.Errorf("Unexpected emit: %v", e) + actions := []string{"create", "set", "compareAndSwap", "delete"} + for _, action := range actions { + w := newEtcdWatcher(false, Everything, codec, versioner, nil) + w.emit = func(e watch.Event) { + t.Errorf("Unexpected emit: %v", e) + } + w.sendResult(&etcd.Response{ + Action: action, + }) + w.Stop() } - w.sendResult(&etcd.Response{ - Action: "set", - }) } func TestWatchInterpretation_ResponseBadData(t *testing.T) { - w := newEtcdWatcher(false, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - w.emit = func(e watch.Event) { - t.Errorf("Unexpected emit: %v", e) + actions := []string{"create", "set", "compareAndSwap", "delete"} + for _, action := range actions { + w := newEtcdWatcher(false, Everything, codec, versioner, nil) + w.emit = func(e watch.Event) { + t.Errorf("Unexpected emit: %v", e) + } + w.sendResult(&etcd.Response{ + Action: action, + Node: &etcd.Node{ + Value: "foobar", + }, + }) + w.sendResult(&etcd.Response{ + Action: action, + PrevNode: &etcd.Node{ + Value: "foobar", + }, + }) + w.Stop() } - w.sendResult(&etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: "foobar", - }, - }) } func TestWatch(t *testing.T) { @@ -512,7 +560,7 @@ func TestWatch(t *testing.T) { } event := <-watching.ResultChan() - if e, a := watch.Modified, event.Type; e != a { + if e, a := watch.Added, event.Type; e != a { t.Errorf("Expected %v, got %v", e, a) } if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { @@ -617,11 +665,13 @@ func TestWatchListFromZeroIndex(t *testing.T) { Nodes: etcd.Nodes{ &etcd.Node{ Value: api.EncodeOrDie(pod), + CreatedIndex: 1, ModifiedIndex: 1, Nodes: etcd.Nodes{}, }, &etcd.Node{ Value: api.EncodeOrDie(pod), + CreatedIndex: 2, ModifiedIndex: 2, Nodes: etcd.Nodes{}, }, @@ -633,15 +683,18 @@ func TestWatchListFromZeroIndex(t *testing.T) { } h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.WatchList("/some/key", 0, nil) + watching, err := h.WatchList("/some/key", 0, Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } // the existing node is detected and the index set - event := <-watching.ResultChan() + event, open := <-watching.ResultChan() + if !open { + t.Fatalf("unexpected channel close") + } for i := 0; i < 2; i++ { - if e, a := watch.Modified, event.Type; e != a { + if e, a := watch.Added, event.Type; e != a { t.Errorf("Expected %v, got %v", e, a) } actualPod, ok := event.Object.(*api.Pod)