Merge pull request #1000 from lavalamp/fix2

Need to remove pods that change labels.
This commit is contained in:
brendandburns 2014-08-25 21:20:33 -07:00
commit 6f84f6d92f
9 changed files with 823 additions and 649 deletions

View File

@ -79,8 +79,15 @@ func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) {
}
// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(resourceVersion uint64) (watch.Interface, error) {
return r.WatchList("/registry/pods", resourceVersion, tools.Everything)
func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool {
pod, ok := obj.(*api.Pod)
if !ok {
glog.Errorf("Unexpected object during pod watch: %#v", obj)
return false
}
return filter(pod)
})
}
// GetPod gets a specific pod specified by its ID.

View File

@ -27,7 +27,7 @@ type Registry interface {
// ListPods obtains a list of pods that match selector.
ListPods(selector labels.Selector) ([]api.Pod, error)
// Watch for new/changed/deleted pods
WatchPods(resourceVersion uint64) (watch.Interface, error)
WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
// Get a specific pod
GetPod(podID string) (*api.Pod, error)
// Create a pod based on a specification.

View File

@ -118,20 +118,14 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
// Watch begins watching for new, changed, or deleted pods.
func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
source, err := rs.registry.WatchPods(resourceVersion)
if err != nil {
return nil, err
}
return watch.Filter(source, func(e watch.Event) (watch.Event, bool) {
pod := e.Object.(*api.Pod)
return rs.registry.WatchPods(resourceVersion, func(pod *api.Pod) bool {
fields := labels.Set{
"ID": pod.ID,
"DesiredState.Status": string(pod.DesiredState.Status),
"DesiredState.Host": pod.DesiredState.Host,
}
passesFilter := label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
return e, passesFilter
}), nil
return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
})
}
func (rs RegistryStorage) New() interface{} {

View File

@ -55,7 +55,8 @@ func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
return filtered, nil
}
func (r *PodRegistry) WatchPods(resourceVersion uint64) (watch.Interface, error) {
func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.mux.Watch(), nil
}

View File

@ -20,12 +20,8 @@ import (
"errors"
"fmt"
"reflect"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
const (
@ -301,256 +297,3 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
return err
}
}
// FilterFunc is a predicate which takes an API object and returns true
// iff the object should remain in the set.
type FilterFunc func(obj interface{}) bool
// Everything is a FilterFunc which accepts all objects.
func Everything(interface{}) bool {
return true
}
// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
return h.WatchAndTransform(key, resourceVersion, nil)
}
// WatchAndTransform begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface. If the transform
// function is provided, the value decoded from etcd will be passed to the function
// prior to being returned.
//
// The transform function can be used to populate data not available to etcd, or to
// change or wrap the serialized etcd object.
//
// startTime := time.Now()
// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) {
// value := input.(TimeAwareValue)
// value.Since = startTime
// return value, nil
// })
//
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
// TransformFunc attempts to convert an object to another object for use with a watcher
type TransformFunc func(interface{}) (interface{}, error)
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding Codec
versioner ResourceVersioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
filter FilterFunc
etcdIncoming chan *etcd.Response
etcdStop chan bool
etcdCallEnded chan struct{}
outgoing chan watch.Event
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
}
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
}
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate()
return w
}
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok {
return
}
resourceVersion = latest
}
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
}
}
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) {
success = true
resp, err := client.Get(key, false, recursive)
if err != nil {
if !IsEtcdNotFound(err) {
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
success = false
return
}
if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index + 1
}
return
}
resourceVersion = resp.EtcdIndex + 1
convertRecursiveResponse(resp.Node, resp, incoming)
return
}
// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
// by copying the original response. This emulates the behavior of a recursive watch.
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
if node.Dir {
for i := range node.Nodes {
convertRecursiveResponse(node.Nodes[i], response, incoming)
}
return
}
copied := *response
if node.ModifiedIndex == node.CreatedIndex {
copied.Action = "create"
} else {
copied.Action = "set"
}
copied.Node = node
incoming <- &copied
}
// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer close(w.outgoing)
defer util.HandleCrash()
for {
select {
case <-w.etcdCallEnded:
return
case <-w.userStop:
w.etcdStop <- true
return
case res, ok := <-w.etcdIncoming:
if !ok {
return
}
w.sendResult(res)
}
}
}
func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
var index uint64
switch res.Action {
case "create":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Added
case "set", "compareAndSwap", "get":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Modified
case "delete":
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
data = []byte(res.PrevNode.Value)
index = res.Node.ModifiedIndex
action = watch.Deleted
default:
glog.Errorf("unknown action: %v", res.Action)
return
}
obj, err := w.encoding.Decode(data)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node)
// TODO: expose an error through watch.Interface?
w.Stop()
return
}
// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.SetResourceVersion(obj, index); err != nil {
glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err)
}
}
// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
glog.Errorf("failure to transform api object %#v: %v", obj, err)
// TODO: expose an error through watch.Interface?
w.Stop()
return
}
}
w.emit(watch.Event{
Type: action,
Object: obj,
})
}
// ResultChannel implements watch.Interface.
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
}
// Stop implements watch.Interface.
func (w *etcdWatcher) Stop() {
w.stopLock.Lock()
defer w.stopLock.Unlock()
// Prevent double channel closes.
if !w.stopped {
w.stopped = true
close(w.userStop)
}
}

View File

@ -22,12 +22,10 @@ import (
"reflect"
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)
@ -364,379 +362,3 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value)
}
}
func TestWatchInterpretation_ListCreate(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
})
got := <-w.outgoing
if e, a := watch.Added, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestWatchInterpretation_ListAdd(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(podBytes),
},
})
got := <-w.outgoing
if e, a := watch.Modified, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestWatchInterpretation_Delete(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "delete",
Node: &etcd.Node{
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(podBytes),
ModifiedIndex: 1,
},
})
got := <-w.outgoing
if e, a := watch.Deleted, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
pod.ResourceVersion = 2
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "update",
})
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "set",
})
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: "foobar",
},
})
}
func TestWatch(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// when server returns not found, the watch index starts at the next value (1)
if fakeClient.WatchIndex != 1 {
t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient)
}
// Test normal case
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(podBytes),
},
}
event := <-watching.ResultChan()
if e, a := watch.Modified, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
// Test error case
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
// Did everything shut down?
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}
func TestWatchFromZeroIndex(t *testing.T) {
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
testCases := map[string]struct {
Response EtcdResponseWithError
ExpectedVersion uint64
ExpectedType watch.EventType
}{
"get value created": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 1,
},
Action: "get",
EtcdIndex: 2,
},
},
1,
watch.Added,
},
"get value modified": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 2,
},
Action: "get",
EtcdIndex: 3,
},
},
2,
watch.Modified,
},
}
for k, testCase := range testCases {
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = testCase.Response
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("%s: unexpected error: %v", k, err)
}
fakeClient.WaitForWatchCompletion()
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
t.Errorf("%s: expected watch index to be %d, got %d", k, e, a)
}
// the existing node is detected and the index set
event := <-watching.ResultChan()
if e, a := testCase.ExpectedType, event.Type; e != a {
t.Errorf("%s: expected %v, got %v", k, e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("%s: expected a pod, got %#v", k, event.Object)
}
if actualPod.ResourceVersion != testCase.ExpectedVersion {
t.Errorf("%s: expected pod with resource version %d, Got %#v", k, testCase.ExpectedVersion, actualPod)
}
pod.ResourceVersion = testCase.ExpectedVersion
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("%s: expected %v, got %v", k, e, a)
}
watching.Stop()
}
}
func TestWatchListFromZeroIndex(t *testing.T) {
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Dir: true,
Nodes: etcd.Nodes{
&etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 1,
Nodes: etcd.Nodes{},
},
&etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 2,
Nodes: etcd.Nodes{},
},
},
},
Action: "get",
EtcdIndex: 3,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.WatchList("/some/key", 0, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// the existing node is detected and the index set
event := <-watching.ResultChan()
for i := 0; i < 2; i++ {
if e, a := watch.Modified, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("expected a pod, got %#v", event.Object)
}
if actualPod.ResourceVersion != 1 {
t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod)
}
pod.ResourceVersion = 1
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
fakeClient.WaitForWatchCompletion()
watching.Stop()
}
func TestWatchFromNotFound(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 100,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
if fakeClient.WatchIndex != 3 {
t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient)
}
watching.Stop()
}
func TestWatchFromOtherError(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 101,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
select {
case _, ok := <-watching.ResultChan():
if ok {
t.Fatalf("expected result channel to be closed")
}
case <-time.After(1 * time.Millisecond):
t.Fatalf("watch should have closed channel: %#v", watching)
}
if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 {
t.Fatalf("Watch should not have been invoked: %#v", fakeClient)
}
}
func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
h := EtcdHelper{fakeClient, codec, versioner}
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
// Test purposeful shutdown
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
watching.Stop()
// Did everything shut down?
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}

View File

@ -0,0 +1,350 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tools
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
// FilterFunc is a predicate which takes an API object and returns true
// iff the object should remain in the set.
type FilterFunc func(obj interface{}) bool
// Everything is a FilterFunc which accepts all objects.
func Everything(interface{}) bool {
return true
}
// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
return h.WatchAndTransform(key, resourceVersion, nil)
}
// WatchAndTransform begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface. If the transform
// function is provided, the value decoded from etcd will be passed to the function
// prior to being returned.
//
// The transform function can be used to populate data not available to etcd, or to
// change or wrap the serialized etcd object.
//
// startTime := time.Now()
// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) {
// value := input.(TimeAwareValue)
// value.Since = startTime
// return value, nil
// })
//
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
// TransformFunc attempts to convert an object to another object for use with a watcher
type TransformFunc func(interface{}) (interface{}, error)
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding Codec
versioner ResourceVersioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
filter FilterFunc
etcdIncoming chan *etcd.Response
etcdStop chan bool
etcdCallEnded chan struct{}
outgoing chan watch.Event
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
}
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
}
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate()
return w
}
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok {
return
}
resourceVersion = latest + 1
}
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
}
}
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) {
success = true
resp, err := client.Get(key, false, recursive)
if err != nil {
if !IsEtcdNotFound(err) {
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
success = false
return
}
if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index
}
return
}
resourceVersion = resp.EtcdIndex
convertRecursiveResponse(resp.Node, resp, incoming)
return
}
// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
// by copying the original response. This emulates the behavior of a recursive watch.
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
if node.Dir {
for i := range node.Nodes {
convertRecursiveResponse(node.Nodes[i], response, incoming)
}
return
}
copied := *response
copied.Action = "get"
copied.Node = node
incoming <- &copied
}
// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer close(w.outgoing)
defer util.HandleCrash()
for {
select {
case <-w.etcdCallEnded:
return
case <-w.userStop:
w.etcdStop <- true
return
case res, ok := <-w.etcdIncoming:
if !ok {
return
}
w.sendResult(res)
}
}
}
func (w *etcdWatcher) decodeObject(data []byte, index uint64) (interface{}, error) {
obj, err := w.encoding.Decode(data)
if err != nil {
return nil, err
}
// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.SetResourceVersion(obj, index); err != nil {
glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err)
}
}
// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
glog.Errorf("failure to transform api object %#v: %v", obj, err)
return nil, err
}
}
return obj, nil
}
func (w *etcdWatcher) sendAdd(res *etcd.Response) {
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data := []byte(res.Node.Value)
obj, err := w.decodeObject(data, res.Node.ModifiedIndex)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter(obj) {
return
}
action := watch.Added
if res.Node.ModifiedIndex != res.Node.CreatedIndex {
action = watch.Modified
}
w.emit(watch.Event{
Type: action,
Object: obj,
})
}
func (w *etcdWatcher) sendModify(res *etcd.Response) {
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
curData := []byte(res.Node.Value)
curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(curData), res, res.Node)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
curObjPasses := w.filter(curObj)
oldObjPasses := false
var oldObj interface{}
if res.PrevNode != nil && res.PrevNode.Value != "" {
// Ignore problems reading the old object.
if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil {
oldObjPasses = w.filter(oldObj)
}
}
// Some changes to an object may cause it to start or stop matching a filter.
// We need to report those as adds/deletes. So we have to check both the previous
// and current value of the object.
switch {
case curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Modified,
Object: curObj,
})
case curObjPasses && !oldObjPasses:
w.emit(watch.Event{
Type: watch.Added,
Object: curObj,
})
case !curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Deleted,
Object: oldObj,
})
}
// Do nothing if neither new nor old object passed the filter.
}
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
data := []byte(res.PrevNode.Value)
index := res.PrevNode.ModifiedIndex
if res.Node != nil {
// Note that this sends the *old* object with the etcd index for the time at
// which it gets deleted. This will allow users to restart the watch at the right
// index.
index = res.Node.ModifiedIndex
}
obj, err := w.decodeObject(data, index)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter(obj) {
return
}
w.emit(watch.Event{
Type: watch.Deleted,
Object: obj,
})
}
func (w *etcdWatcher) sendResult(res *etcd.Response) {
switch res.Action {
case "create", "get":
w.sendAdd(res)
case "set", "compareAndSwap":
w.sendModify(res)
case "delete":
w.sendDelete(res)
default:
glog.Errorf("unknown action: %v", res.Action)
}
}
// ResultChannel implements watch.Interface.
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
}
// Stop implements watch.Interface.
func (w *etcdWatcher) Stop() {
w.stopLock.Lock()
defer w.stopLock.Unlock()
// Prevent double channel closes.
if !w.stopped {
w.stopped = true
close(w.userStop)
}
}

View File

@ -0,0 +1,457 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tools
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)
func TestWatchInterpretations(t *testing.T) {
// Declare some pods to make the test cases compact.
podFoo := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBar := &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}
podBaz := &api.Pod{JSONBase: api.JSONBase{ID: "baz"}}
firstLetterIsB := func(obj interface{}) bool {
return obj.(*api.Pod).ID[0] == 'b'
}
// All of these test cases will be run with the firstLetterIsB FilterFunc.
table := map[string]struct {
actions []string // Run this test item for every action here.
prevNodeValue string
nodeValue string
expectEmit bool
expectType watch.EventType
expectObject interface{}
}{
"create": {
actions: []string{"create", "get"},
nodeValue: api.EncodeOrDie(podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"create but filter blocks": {
actions: []string{"create", "get"},
nodeValue: api.EncodeOrDie(podFoo),
expectEmit: false,
},
"delete": {
actions: []string{"delete"},
prevNodeValue: api.EncodeOrDie(podBar),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar,
},
"delete but filter blocks": {
actions: []string{"delete"},
nodeValue: api.EncodeOrDie(podFoo),
expectEmit: false,
},
"modify appears to create 1": {
actions: []string{"set", "compareAndSwap"},
nodeValue: api.EncodeOrDie(podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to create 2": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: api.EncodeOrDie(podFoo),
nodeValue: api.EncodeOrDie(podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to delete": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: api.EncodeOrDie(podBar),
nodeValue: api.EncodeOrDie(podFoo),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar, // Should return last state that passed the filter!
},
"modify modifies": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: api.EncodeOrDie(podBar),
nodeValue: api.EncodeOrDie(podBaz),
expectEmit: true,
expectType: watch.Modified,
expectObject: podBaz,
},
"modify ignores": {
actions: []string{"set", "compareAndSwap"},
nodeValue: api.EncodeOrDie(podFoo),
expectEmit: false,
},
}
for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, firstLetterIsB, codec, versioner, nil)
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
if !item.expectEmit {
return
}
if e, a := item.expectType, event.Type; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
if e, a := item.expectObject, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
}
var n, pn *etcd.Node
if item.nodeValue != "" {
n = &etcd.Node{Value: item.nodeValue}
}
if item.prevNodeValue != "" {
pn = &etcd.Node{Value: item.prevNodeValue}
}
w.sendResult(&etcd.Response{
Action: action,
Node: n,
PrevNode: pn,
})
if e, a := item.expectEmit, emitCalled; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
w.Stop()
}
}
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "update",
})
w.Stop()
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
})
w.Stop()
}
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
Node: &etcd.Node{
Value: "foobar",
},
})
w.sendResult(&etcd.Response{
Action: action,
PrevNode: &etcd.Node{
Value: "foobar",
},
})
w.Stop()
}
}
func TestWatch(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// when server returns not found, the watch index starts at the next value (1)
if fakeClient.WatchIndex != 1 {
t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient)
}
// Test normal case
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(podBytes),
},
}
event := <-watching.ResultChan()
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
// Test error case
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
// Did everything shut down?
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}
func TestWatchFromZeroIndex(t *testing.T) {
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
testCases := map[string]struct {
Response EtcdResponseWithError
ExpectedVersion uint64
ExpectedType watch.EventType
}{
"get value created": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 1,
},
Action: "get",
EtcdIndex: 2,
},
},
1,
watch.Added,
},
"get value modified": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 2,
},
Action: "get",
EtcdIndex: 3,
},
},
2,
watch.Modified,
},
}
for k, testCase := range testCases {
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = testCase.Response
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("%s: unexpected error: %v", k, err)
}
fakeClient.WaitForWatchCompletion()
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
t.Errorf("%s: expected watch index to be %d, got %d", k, e, a)
}
// the existing node is detected and the index set
event := <-watching.ResultChan()
if e, a := testCase.ExpectedType, event.Type; e != a {
t.Errorf("%s: expected %v, got %v", k, e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("%s: expected a pod, got %#v", k, event.Object)
}
if actualPod.ResourceVersion != testCase.ExpectedVersion {
t.Errorf("%s: expected pod with resource version %d, Got %#v", k, testCase.ExpectedVersion, actualPod)
}
pod.ResourceVersion = testCase.ExpectedVersion
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("%s: expected %v, got %v", k, e, a)
}
watching.Stop()
}
}
func TestWatchListFromZeroIndex(t *testing.T) {
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Dir: true,
Nodes: etcd.Nodes{
&etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 1,
Nodes: etcd.Nodes{},
},
&etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 2,
ModifiedIndex: 2,
Nodes: etcd.Nodes{},
},
},
},
Action: "get",
EtcdIndex: 3,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.WatchList("/some/key", 0, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// the existing node is detected and the index set
event, open := <-watching.ResultChan()
if !open {
t.Fatalf("unexpected channel close")
}
for i := 0; i < 2; i++ {
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("expected a pod, got %#v", event.Object)
}
if actualPod.ResourceVersion != 1 {
t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod)
}
pod.ResourceVersion = 1
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
fakeClient.WaitForWatchCompletion()
watching.Stop()
}
func TestWatchFromNotFound(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 100,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
if fakeClient.WatchIndex != 3 {
t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient)
}
watching.Stop()
}
func TestWatchFromOtherError(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 101,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
select {
case _, ok := <-watching.ResultChan():
if ok {
t.Fatalf("expected result channel to be closed")
}
case <-time.After(1 * time.Millisecond):
t.Fatalf("watch should have closed channel: %#v", watching)
}
if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 {
t.Fatalf("Watch should not have been invoked: %#v", fakeClient)
}
}
func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
h := EtcdHelper{fakeClient, codec, versioner}
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
// Test purposeful shutdown
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
watching.Stop()
// Did everything shut down?
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}

View File

@ -125,7 +125,7 @@ func TestWatch(t *testing.T) {
expectedVersion = resp.Node.ModifiedIndex
event = <-w.ResultChan()
if event.Type != watch.Deleted {
t.Fatalf("expected deleted event", event)
t.Errorf("expected deleted event %#v", event)
}
pod = event.Object.(*api.Pod)
if pod.ResourceVersion != expectedVersion {