mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
commit
fbd71c9c02
@ -22,7 +22,10 @@ import (
|
||||
"reflect"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -53,6 +56,7 @@ type EtcdGetSet interface {
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
||||
@ -228,3 +232,156 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// FilterFunc is a predicate which takes an API object and returns true
|
||||
// iff the object should remain in the set.
|
||||
type FilterFunc func(obj interface{}) bool
|
||||
|
||||
// Everything is a FilterFunc which accepts all objects.
|
||||
func Everything(interface{}) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// WatchList begins watching the specified key's items. Items are decoded into
|
||||
// API objects, and any items passing 'filter' are sent down the returned
|
||||
// watch.Interface.
|
||||
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
|
||||
w := newEtcdWatcher(true, filter)
|
||||
go w.etcdWatch(h.Client, key)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Watch begins watching the specified key. Events are decoded into
|
||||
// API objects and sent down the returned watch.Interface.
|
||||
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
|
||||
w := newEtcdWatcher(false, nil)
|
||||
go w.etcdWatch(h.Client, key)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||
type etcdWatcher struct {
|
||||
list bool // If we're doing a recursive watch, should be true.
|
||||
filter FilterFunc
|
||||
|
||||
etcdIncoming chan *etcd.Response
|
||||
etcdStop chan bool
|
||||
|
||||
outgoing chan watch.Event
|
||||
userStop chan struct{}
|
||||
}
|
||||
|
||||
// Returns a new etcdWatcher; if list is true, watch sub-nodes.
|
||||
func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher {
|
||||
w := &etcdWatcher{
|
||||
list: list,
|
||||
filter: filter,
|
||||
etcdIncoming: make(chan *etcd.Response),
|
||||
etcdStop: make(chan bool),
|
||||
outgoing: make(chan watch.Event),
|
||||
userStop: make(chan struct{}),
|
||||
}
|
||||
go w.translate()
|
||||
return w
|
||||
}
|
||||
|
||||
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
||||
// as a goroutine.
|
||||
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) {
|
||||
defer util.HandleCrash()
|
||||
_, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop)
|
||||
if err == etcd.ErrWatchStoppedByUser {
|
||||
// etcd doesn't close the channel in this case.
|
||||
close(w.etcdIncoming)
|
||||
} else {
|
||||
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Pull stuff from etcd, convert, and push out the outgoing channel. Meant to be
|
||||
// called as a goroutine.
|
||||
func (w *etcdWatcher) translate() {
|
||||
defer close(w.outgoing)
|
||||
defer util.HandleCrash()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.userStop:
|
||||
w.etcdStop <- true
|
||||
return
|
||||
case res, ok := <-w.etcdIncoming:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
w.sendResult(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *etcdWatcher) sendResult(res *etcd.Response) {
|
||||
var action watch.EventType
|
||||
var data []byte
|
||||
var nodes etcd.Nodes
|
||||
switch res.Action {
|
||||
case "set":
|
||||
if res.Node == nil {
|
||||
glog.Errorf("unexpected nil node: %#v", res)
|
||||
return
|
||||
}
|
||||
data = []byte(res.Node.Value)
|
||||
nodes = res.Node.Nodes
|
||||
// TODO: Is this conditional correct?
|
||||
if res.EtcdIndex > 0 {
|
||||
action = watch.Modified
|
||||
} else {
|
||||
action = watch.Added
|
||||
}
|
||||
case "delete":
|
||||
if res.PrevNode == nil {
|
||||
glog.Errorf("unexpected nil prev node: %#v", res)
|
||||
return
|
||||
}
|
||||
data = []byte(res.PrevNode.Value)
|
||||
nodes = res.PrevNode.Nodes
|
||||
action = watch.Deleted
|
||||
}
|
||||
|
||||
// If listing, we're interested in sub-nodes.
|
||||
if w.list {
|
||||
for _, n := range nodes {
|
||||
obj, err := api.Decode([]byte(n.Value))
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: %#v", res)
|
||||
continue
|
||||
}
|
||||
if w.filter != nil && !w.filter(obj) {
|
||||
continue
|
||||
}
|
||||
w.outgoing <- watch.Event{
|
||||
Type: action,
|
||||
Object: obj,
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
obj, err := api.Decode(data)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: %#v", res)
|
||||
return
|
||||
}
|
||||
w.outgoing <- watch.Event{
|
||||
Type: action,
|
||||
Object: obj,
|
||||
}
|
||||
}
|
||||
|
||||
// ResultChannel implements watch.Interface.
|
||||
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
|
||||
return w.outgoing
|
||||
}
|
||||
|
||||
// Stop implements watch.Interface.
|
||||
func (w *etcdWatcher) Stop() {
|
||||
close(w.userStop)
|
||||
}
|
||||
|
@ -20,8 +20,11 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
@ -150,3 +153,181 @@ func TestSetObj(t *testing.T) {
|
||||
t.Errorf("Wanted %v, got %v", expect, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchInterpretation_ListAdd(t *testing.T) {
|
||||
called := false
|
||||
w := newEtcdWatcher(true, func(interface{}) bool {
|
||||
called = true
|
||||
return true
|
||||
})
|
||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
podBytes, _ := api.Encode(pod)
|
||||
|
||||
go w.sendResult(&etcd.Response{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Nodes: etcd.Nodes{
|
||||
{
|
||||
Value: string(podBytes),
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
got := <-w.outgoing
|
||||
if e, a := watch.Added, got.Type; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if !called {
|
||||
t.Errorf("filter never called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchInterpretation_ListDelete(t *testing.T) {
|
||||
called := false
|
||||
w := newEtcdWatcher(true, func(interface{}) bool {
|
||||
called = true
|
||||
return true
|
||||
})
|
||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
podBytes, _ := api.Encode(pod)
|
||||
|
||||
go w.sendResult(&etcd.Response{
|
||||
Action: "delete",
|
||||
PrevNode: &etcd.Node{
|
||||
Nodes: etcd.Nodes{
|
||||
{
|
||||
Value: string(podBytes),
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
got := <-w.outgoing
|
||||
if e, a := watch.Deleted, got.Type; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if !called {
|
||||
t.Errorf("filter never called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchInterpretation_SingleAdd(t *testing.T) {
|
||||
w := newEtcdWatcher(false, func(interface{}) bool {
|
||||
t.Errorf("unexpected filter call")
|
||||
return true
|
||||
})
|
||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
podBytes, _ := api.Encode(pod)
|
||||
|
||||
go w.sendResult(&etcd.Response{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Value: string(podBytes),
|
||||
},
|
||||
})
|
||||
|
||||
got := <-w.outgoing
|
||||
if e, a := watch.Added, got.Type; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchInterpretation_SingleDelete(t *testing.T) {
|
||||
w := newEtcdWatcher(false, func(interface{}) bool {
|
||||
t.Errorf("unexpected filter call")
|
||||
return true
|
||||
})
|
||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
podBytes, _ := api.Encode(pod)
|
||||
|
||||
go w.sendResult(&etcd.Response{
|
||||
Action: "delete",
|
||||
PrevNode: &etcd.Node{
|
||||
Value: string(podBytes),
|
||||
},
|
||||
})
|
||||
|
||||
got := <-w.outgoing
|
||||
if e, a := watch.Deleted, got.Type; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
fakeEtcd := MakeFakeEtcdClient(t)
|
||||
h := EtcdHelper{fakeEtcd}
|
||||
|
||||
watching, err := h.Watch("/some/key")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
fakeEtcd.WaitForWatchCompletion()
|
||||
|
||||
// Test normal case
|
||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
podBytes, _ := api.Encode(pod)
|
||||
fakeEtcd.WatchResponse <- &etcd.Response{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Value: string(podBytes),
|
||||
},
|
||||
}
|
||||
|
||||
select {
|
||||
case event := <-watching.ResultChan():
|
||||
if e, a := watch.Added, event.Type; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Errorf("Expected 1 call but got 0")
|
||||
}
|
||||
|
||||
// Test error case
|
||||
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")
|
||||
|
||||
// Did everything shut down?
|
||||
if _, open := <-fakeEtcd.WatchResponse; open {
|
||||
t.Errorf("An injected error did not cause a graceful shutdown")
|
||||
}
|
||||
if _, open := <-watching.ResultChan(); open {
|
||||
t.Errorf("An injected error did not cause a graceful shutdown")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchPurposefulShutdown(t *testing.T) {
|
||||
fakeEtcd := MakeFakeEtcdClient(t)
|
||||
h := EtcdHelper{fakeEtcd}
|
||||
|
||||
// Test purposeful shutdown
|
||||
watching, err := h.Watch("/some/key")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
fakeEtcd.WaitForWatchCompletion()
|
||||
watching.Stop()
|
||||
|
||||
// Did everything shut down?
|
||||
if _, open := <-fakeEtcd.WatchResponse; open {
|
||||
t.Errorf("A stop did not cause a graceful shutdown")
|
||||
}
|
||||
if _, open := <-watching.ResultChan(); open {
|
||||
t.Errorf("An injected error did not cause a graceful shutdown")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user