mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Merge pull request #13380 from wojtek-t/refactor_etcd_watch_test
Refactoring of watch etcd tests.
This commit is contained in:
commit
c048419810
@ -21,6 +21,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -120,7 +121,10 @@ func copyOrDie(obj runtime.Object) runtime.Object {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type AssignFunc func([]runtime.Object) []runtime.Object
|
type AssignFunc func([]runtime.Object) []runtime.Object
|
||||||
|
type EmitFunc func(runtime.Object, string) error
|
||||||
type GetFunc func(api.Context, runtime.Object) (runtime.Object, error)
|
type GetFunc func(api.Context, runtime.Object) (runtime.Object, error)
|
||||||
|
type InitWatchFunc func()
|
||||||
|
type InjectErrFunc func(err error)
|
||||||
type SetFunc func(api.Context, runtime.Object) error
|
type SetFunc func(api.Context, runtime.Object) error
|
||||||
type SetRVFunc func(uint64)
|
type SetRVFunc func(uint64)
|
||||||
type UpdateFunc func(runtime.Object) runtime.Object
|
type UpdateFunc func(runtime.Object) runtime.Object
|
||||||
@ -185,7 +189,7 @@ func (t *Tester) TestGet(obj runtime.Object) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test listing object.
|
// Test listing objects.
|
||||||
func (t *Tester) TestList(obj runtime.Object, assignFn AssignFunc, setRVFn SetRVFunc) {
|
func (t *Tester) TestList(obj runtime.Object, assignFn AssignFunc, setRVFn SetRVFunc) {
|
||||||
t.testListError()
|
t.testListError()
|
||||||
t.testListFound(obj, assignFn)
|
t.testListFound(obj, assignFn)
|
||||||
@ -193,6 +197,15 @@ func (t *Tester) TestList(obj runtime.Object, assignFn AssignFunc, setRVFn SetRV
|
|||||||
t.testListMatchLabels(obj, assignFn)
|
t.testListMatchLabels(obj, assignFn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test watching objects.
|
||||||
|
func (t *Tester) TestWatch(
|
||||||
|
obj runtime.Object, initWatchFn InitWatchFunc, injectErrFn InjectErrFunc, emitFn EmitFunc,
|
||||||
|
labelsPass, labelsFail []labels.Set, fieldsPass, fieldsFail []fields.Set, actions []string) {
|
||||||
|
t.testWatch(initWatchFn, injectErrFn)
|
||||||
|
t.testWatchLabels(copyOrDie(obj), initWatchFn, emitFn, labelsPass, labelsFail, actions)
|
||||||
|
t.testWatchFields(copyOrDie(obj), initWatchFn, emitFn, fieldsPass, fieldsFail, actions)
|
||||||
|
}
|
||||||
|
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
// Creation tests.
|
// Creation tests.
|
||||||
|
|
||||||
@ -914,3 +927,125 @@ func (t *Tester) testListNotFound(assignFn AssignFunc, setRVFn SetRVFunc) {
|
|||||||
t.Errorf("unexpected resource version: %d", meta.ResourceVersion)
|
t.Errorf("unexpected resource version: %d", meta.ResourceVersion)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// =============================================================================
|
||||||
|
// Watching tests.
|
||||||
|
|
||||||
|
func (t *Tester) testWatch(initWatchFn InitWatchFunc, injectErrFn InjectErrFunc) {
|
||||||
|
ctx := t.TestContext()
|
||||||
|
watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), fields.Everything(), "1")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
initWatchFn()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _, ok := <-watcher.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("watch channel should be open")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
injectErrFn(nil)
|
||||||
|
if _, ok := <-watcher.ResultChan(); ok {
|
||||||
|
t.Errorf("watch channel should be closed")
|
||||||
|
}
|
||||||
|
watcher.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tester) testWatchFields(obj runtime.Object, initWatchFn InitWatchFunc, emitFn EmitFunc, fieldsPass, fieldsFail []fields.Set, actions []string) {
|
||||||
|
ctx := t.TestContext()
|
||||||
|
|
||||||
|
for _, field := range fieldsPass {
|
||||||
|
for _, action := range actions {
|
||||||
|
watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), field.AsSelector(), "1")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
initWatchFn()
|
||||||
|
if err := emitFn(obj, action); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _, ok := <-watcher.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("watch channel should be open")
|
||||||
|
}
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
t.Errorf("unexpected timeout from result channel")
|
||||||
|
}
|
||||||
|
watcher.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, field := range fieldsFail {
|
||||||
|
for _, action := range actions {
|
||||||
|
watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), field.AsSelector(), "1")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
initWatchFn()
|
||||||
|
if err := emitFn(obj, action); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-watcher.ResultChan():
|
||||||
|
t.Errorf("unexpected result from result channel")
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
// expected case
|
||||||
|
}
|
||||||
|
watcher.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tester) testWatchLabels(obj runtime.Object, initWatchFn InitWatchFunc, emitFn EmitFunc, labelsPass, labelsFail []labels.Set, actions []string) {
|
||||||
|
ctx := t.TestContext()
|
||||||
|
|
||||||
|
for _, label := range labelsPass {
|
||||||
|
for _, action := range actions {
|
||||||
|
watcher, err := t.storage.(rest.Watcher).Watch(ctx, label.AsSelector(), fields.Everything(), "1")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
initWatchFn()
|
||||||
|
if err := emitFn(obj, action); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _, ok := <-watcher.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("watch channel should be open")
|
||||||
|
}
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
t.Errorf("unexpected timeout from result channel")
|
||||||
|
}
|
||||||
|
watcher.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, label := range labelsFail {
|
||||||
|
for _, action := range actions {
|
||||||
|
watcher, err := t.storage.(rest.Watcher).Watch(ctx, label.AsSelector(), fields.Everything(), "1")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
initWatchFn()
|
||||||
|
if err := emitFn(obj, action); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-watcher.ResultChan():
|
||||||
|
t.Errorf("unexpected result from result channel")
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
// expected case
|
||||||
|
}
|
||||||
|
watcher.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -18,7 +18,6 @@ package etcd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -26,19 +25,12 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
|
||||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
|
||||||
"k8s.io/kubernetes/pkg/tools"
|
"k8s.io/kubernetes/pkg/tools"
|
||||||
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
PASS = iota
|
|
||||||
FAIL
|
|
||||||
)
|
|
||||||
|
|
||||||
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
||||||
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
|
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
|
||||||
return NewREST(etcdStorage), fakeClient
|
return NewREST(etcdStorage), fakeClient
|
||||||
@ -83,7 +75,7 @@ func validNewController() *api.ReplicationController {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var validController = *validNewController()
|
var validController = validNewController()
|
||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func TestCreate(t *testing.T) {
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
@ -219,13 +211,54 @@ func TestEtcdListControllers(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchControllers(t *testing.T) {
|
||||||
|
storage, fakeClient := newStorage(t)
|
||||||
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
|
test.TestWatch(
|
||||||
|
validController,
|
||||||
|
func() {
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
},
|
||||||
|
func(err error) {
|
||||||
|
fakeClient.WatchInjectError <- err
|
||||||
|
},
|
||||||
|
func(obj runtime.Object, action string) error {
|
||||||
|
return registrytest.EmitObject(fakeClient, obj, action)
|
||||||
|
},
|
||||||
|
// matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"a": "b"},
|
||||||
|
},
|
||||||
|
// not matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"a": "c"},
|
||||||
|
{"foo": "bar"},
|
||||||
|
},
|
||||||
|
// matching fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"status.replicas": "0"},
|
||||||
|
{"metadata.name": "foo"},
|
||||||
|
{"status.replicas": "0", "metadata.name": "foo"},
|
||||||
|
},
|
||||||
|
// not matchin fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"status.replicas": "10"},
|
||||||
|
{"metadata.name": "bar"},
|
||||||
|
{"name": "foo"},
|
||||||
|
{"status.replicas": "10", "metadata.name": "foo"},
|
||||||
|
{"status.replicas": "0", "metadata.name": "bar"},
|
||||||
|
},
|
||||||
|
registrytest.WatchActions,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func TestEtcdDeleteController(t *testing.T) {
|
func TestEtcdDeleteController(t *testing.T) {
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
key, _ := storage.KeyFunc(ctx, validController.Name)
|
key, _ := storage.KeyFunc(ctx, validController.Name)
|
||||||
key = etcdtest.AddPrefix(key)
|
key = etcdtest.AddPrefix(key)
|
||||||
|
|
||||||
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validNewController()), 0)
|
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validController), 0)
|
||||||
obj, err := storage.Delete(ctx, validController.Name, nil)
|
obj, err := storage.Delete(ctx, validController.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -243,196 +276,6 @@ func TestEtcdDeleteController(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEtcdWatchController(t *testing.T) {
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.Everything(),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
fakeClient.WatchInjectError <- nil
|
|
||||||
if _, ok := <-watching.ResultChan(); ok {
|
|
||||||
t.Errorf("watching channel should be closed")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchControllersMatch(t *testing.T) {
|
|
||||||
ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace)
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
|
|
||||||
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(validController.Spec.Selector),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
// The watcher above is waiting for these Labels, on receiving them it should
|
|
||||||
// apply the ControllerStatus decorator, which lists pods, causing a query against
|
|
||||||
// the /registry/pods endpoint of the etcd client.
|
|
||||||
controller := &api.ReplicationController{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "foo",
|
|
||||||
Labels: validController.Spec.Selector,
|
|
||||||
Namespace: "default",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
controllerBytes, _ := testapi.Codec().Encode(controller)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(controllerBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
t.Error("unexpected timeout from result channel")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchControllersFields(t *testing.T) {
|
|
||||||
ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace)
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
|
|
||||||
|
|
||||||
testFieldMap := map[int][]fields.Set{
|
|
||||||
PASS: {
|
|
||||||
{"status.replicas": "0"},
|
|
||||||
{"metadata.name": "foo"},
|
|
||||||
{"status.replicas": "0", "metadata.name": "foo"},
|
|
||||||
},
|
|
||||||
FAIL: {
|
|
||||||
{"status.replicas": "10"},
|
|
||||||
{"metadata.name": "bar"},
|
|
||||||
{"name": "foo"},
|
|
||||||
{"status.replicas": "10", "metadata.name": "foo"},
|
|
||||||
{"status.replicas": "0", "metadata.name": "bar"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
testEtcdActions := []string{
|
|
||||||
etcdstorage.EtcdCreate,
|
|
||||||
etcdstorage.EtcdSet,
|
|
||||||
etcdstorage.EtcdCAS,
|
|
||||||
etcdstorage.EtcdDelete}
|
|
||||||
|
|
||||||
controller := &api.ReplicationController{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "foo",
|
|
||||||
Labels: validController.Spec.Selector,
|
|
||||||
Namespace: "default",
|
|
||||||
},
|
|
||||||
Status: api.ReplicationControllerStatus{
|
|
||||||
Replicas: 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
controllerBytes, _ := testapi.Codec().Encode(controller)
|
|
||||||
|
|
||||||
for expectedResult, fieldSet := range testFieldMap {
|
|
||||||
for _, field := range fieldSet {
|
|
||||||
for _, action := range testEtcdActions {
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.Everything(),
|
|
||||||
field.AsSelector(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
var prevNode *etcd.Node = nil
|
|
||||||
node := &etcd.Node{
|
|
||||||
Value: string(controllerBytes),
|
|
||||||
}
|
|
||||||
if action == etcdstorage.EtcdDelete {
|
|
||||||
prevNode = node
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: action,
|
|
||||||
Node: node,
|
|
||||||
PrevNode: prevNode,
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case r, ok := <-watching.ResultChan():
|
|
||||||
if expectedResult == FAIL {
|
|
||||||
t.Errorf("Unexpected result from channel %#v", r)
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
if expectedResult == PASS {
|
|
||||||
t.Error("unexpected timeout from result channel")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchControllersNotMatch(t *testing.T) {
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
|
|
||||||
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
controller := &api.ReplicationController{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "bar",
|
|
||||||
Labels: map[string]string{
|
|
||||||
"name": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
controllerBytes, _ := testapi.Codec().Encode(controller)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(controllerBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-watching.ResultChan():
|
|
||||||
t.Error("unexpected result from result channel")
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
// expected case
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDelete(t *testing.T) {
|
func TestDelete(t *testing.T) {
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
|
@ -18,7 +18,6 @@ package etcd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -27,19 +26,12 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/expapi"
|
"k8s.io/kubernetes/pkg/expapi"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
|
||||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
|
||||||
"k8s.io/kubernetes/pkg/tools"
|
"k8s.io/kubernetes/pkg/tools"
|
||||||
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
PASS = iota
|
|
||||||
FAIL
|
|
||||||
)
|
|
||||||
|
|
||||||
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
||||||
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
|
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
|
||||||
return NewREST(etcdStorage), fakeClient
|
return NewREST(etcdStorage), fakeClient
|
||||||
@ -84,16 +76,16 @@ func validNewDaemon() *expapi.Daemon {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var validDaemon = *validNewDaemon()
|
var validDaemon = validNewDaemon()
|
||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func TestCreate(t *testing.T) {
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
test := resttest.New(t, storage, fakeClient.SetError)
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
controller := validNewDaemon()
|
daemon := validNewDaemon()
|
||||||
controller.ObjectMeta = api.ObjectMeta{}
|
daemon.ObjectMeta = api.ObjectMeta{}
|
||||||
test.TestCreate(
|
test.TestCreate(
|
||||||
// valid
|
// valid
|
||||||
controller,
|
daemon,
|
||||||
func(ctx api.Context, obj runtime.Object) error {
|
func(ctx api.Context, obj runtime.Object) error {
|
||||||
return registrytest.SetObject(fakeClient, storage.KeyFunc, ctx, obj)
|
return registrytest.SetObject(fakeClient, storage.KeyFunc, ctx, obj)
|
||||||
},
|
},
|
||||||
@ -175,13 +167,49 @@ func TestEtcdListControllers(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchControllers(t *testing.T) {
|
||||||
|
storage, fakeClient := newStorage(t)
|
||||||
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
|
test.TestWatch(
|
||||||
|
validDaemon,
|
||||||
|
func() {
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
},
|
||||||
|
func(err error) {
|
||||||
|
fakeClient.WatchInjectError <- err
|
||||||
|
},
|
||||||
|
func(obj runtime.Object, action string) error {
|
||||||
|
return registrytest.EmitObject(fakeClient, obj, action)
|
||||||
|
},
|
||||||
|
// matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"a": "b"},
|
||||||
|
},
|
||||||
|
// not matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"a": "c"},
|
||||||
|
{"foo": "bar"},
|
||||||
|
},
|
||||||
|
// matching fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "foo"},
|
||||||
|
},
|
||||||
|
// notmatching fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "bar"},
|
||||||
|
{"name": "foo"},
|
||||||
|
},
|
||||||
|
registrytest.WatchActions,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func TestEtcdDeleteController(t *testing.T) {
|
func TestEtcdDeleteController(t *testing.T) {
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
key, err := storage.KeyFunc(ctx, validDaemon.Name)
|
key, err := storage.KeyFunc(ctx, validDaemon.Name)
|
||||||
key = etcdtest.AddPrefix(key)
|
key = etcdtest.AddPrefix(key)
|
||||||
|
|
||||||
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validNewDaemon()), 0)
|
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validDaemon), 0)
|
||||||
obj, err := storage.Delete(ctx, validDaemon.Name, nil)
|
obj, err := storage.Delete(ctx, validDaemon.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -199,195 +227,6 @@ func TestEtcdDeleteController(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEtcdWatchController(t *testing.T) {
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.Everything(),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
fakeClient.WatchInjectError <- nil
|
|
||||||
if _, ok := <-watching.ResultChan(); ok {
|
|
||||||
t.Errorf("watching channel should be closed")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tests that we can watch for the creation of daemon controllers with specified labels.
|
|
||||||
func TestEtcdWatchControllersMatch(t *testing.T) {
|
|
||||||
ctx := api.WithNamespace(api.NewDefaultContext(), validDaemon.Namespace)
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
|
|
||||||
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(validDaemon.Spec.Selector),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
// The watcher above is waiting for these Labels, on receiving them it should
|
|
||||||
// apply the ControllerStatus decorator, which lists pods, causing a query against
|
|
||||||
// the /registry/pods endpoint of the etcd client.
|
|
||||||
controller := &expapi.Daemon{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "foo",
|
|
||||||
Labels: validDaemon.Spec.Selector,
|
|
||||||
Namespace: "default",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
controllerBytes, _ := testapi.Codec().Encode(controller)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(controllerBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
t.Error("unexpected timeout from result channel")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tests that we can watch for daemon controllers with specified fields.
|
|
||||||
func TestEtcdWatchControllersFields(t *testing.T) {
|
|
||||||
ctx := api.WithNamespace(api.NewDefaultContext(), validDaemon.Namespace)
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
|
|
||||||
|
|
||||||
testFieldMap := map[int][]fields.Set{
|
|
||||||
PASS: {
|
|
||||||
{"metadata.name": "foo"},
|
|
||||||
},
|
|
||||||
FAIL: {
|
|
||||||
{"metadata.name": "bar"},
|
|
||||||
{"name": "foo"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
testEtcdActions := []string{
|
|
||||||
etcdstorage.EtcdCreate,
|
|
||||||
etcdstorage.EtcdSet,
|
|
||||||
etcdstorage.EtcdCAS,
|
|
||||||
etcdstorage.EtcdDelete}
|
|
||||||
|
|
||||||
controller := &expapi.Daemon{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "foo",
|
|
||||||
Labels: validDaemon.Spec.Selector,
|
|
||||||
Namespace: "default",
|
|
||||||
},
|
|
||||||
Status: expapi.DaemonStatus{
|
|
||||||
CurrentNumberScheduled: 2,
|
|
||||||
NumberMisscheduled: 1,
|
|
||||||
DesiredNumberScheduled: 4,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
controllerBytes, _ := testapi.Codec().Encode(controller)
|
|
||||||
|
|
||||||
for expectedResult, fieldSet := range testFieldMap {
|
|
||||||
for _, field := range fieldSet {
|
|
||||||
for _, action := range testEtcdActions {
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.Everything(),
|
|
||||||
field.AsSelector(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
var prevNode *etcd.Node = nil
|
|
||||||
node := &etcd.Node{
|
|
||||||
Value: string(controllerBytes),
|
|
||||||
}
|
|
||||||
if action == etcdstorage.EtcdDelete {
|
|
||||||
prevNode = node
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: action,
|
|
||||||
Node: node,
|
|
||||||
PrevNode: prevNode,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case r, ok := <-watching.ResultChan():
|
|
||||||
if expectedResult == FAIL {
|
|
||||||
t.Errorf("Unexpected result from channel %#v", r)
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
if expectedResult == PASS {
|
|
||||||
t.Error("unexpected timeout from result channel")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchControllersNotMatch(t *testing.T) {
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
|
|
||||||
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
controller := &expapi.Daemon{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "bar",
|
|
||||||
Labels: map[string]string{
|
|
||||||
"name": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
controllerBytes, _ := testapi.Codec().Encode(controller)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(controllerBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-watching.ResultChan():
|
|
||||||
t.Error("unexpected result from result channel")
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
// expected case
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDelete(t *testing.T) {
|
func TestDelete(t *testing.T) {
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
|
@ -19,7 +19,6 @@ package etcd
|
|||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/resource"
|
"k8s.io/kubernetes/pkg/api/resource"
|
||||||
@ -169,6 +168,41 @@ func TestEtcdListNodes(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchNodes(t *testing.T) {
|
||||||
|
storage, fakeClient := newStorage(t)
|
||||||
|
test := resttest.New(t, storage, fakeClient.SetError).ClusterScope()
|
||||||
|
test.TestWatch(
|
||||||
|
validNewNode(),
|
||||||
|
func() {
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
},
|
||||||
|
func(err error) {
|
||||||
|
fakeClient.WatchInjectError <- err
|
||||||
|
},
|
||||||
|
func(obj runtime.Object, action string) error {
|
||||||
|
return registrytest.EmitObject(fakeClient, obj, action)
|
||||||
|
},
|
||||||
|
// matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"name": "foo"},
|
||||||
|
},
|
||||||
|
// not matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"name": "bar"},
|
||||||
|
{"foo": "bar"},
|
||||||
|
},
|
||||||
|
// matching fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "foo"},
|
||||||
|
},
|
||||||
|
// not matchin fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "bar"},
|
||||||
|
},
|
||||||
|
registrytest.WatchActions,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func TestEtcdDeleteNode(t *testing.T) {
|
func TestEtcdDeleteNode(t *testing.T) {
|
||||||
ctx := api.NewContext()
|
ctx := api.NewContext()
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
@ -188,94 +222,3 @@ func TestEtcdDeleteNode(t *testing.T) {
|
|||||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEtcdWatchNode(t *testing.T) {
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.Everything(),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
fakeClient.WatchInjectError <- nil
|
|
||||||
if _, ok := <-watching.ResultChan(); ok {
|
|
||||||
t.Errorf("watching channel should be closed")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchNodesMatch(t *testing.T) {
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
node := validNewNode()
|
|
||||||
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(labels.Set{"name": node.Name}),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
nodeBytes, _ := testapi.Codec().Encode(node)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(nodeBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
t.Error("unexpected timeout from result channel")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchNodesNotMatch(t *testing.T) {
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
storage, fakeClient := newStorage(t)
|
|
||||||
node := validNewNode()
|
|
||||||
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(labels.Set{"name": "bar"}),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
nodeBytes, _ := testapi.Codec().Encode(node)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(nodeBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-watching.ResultChan():
|
|
||||||
t.Error("unexpected result from result channel")
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
// expected case
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
@ -376,17 +375,15 @@ func TestDeletePod(t *testing.T) {
|
|||||||
func TestEtcdGet(t *testing.T) {
|
func TestEtcdGet(t *testing.T) {
|
||||||
storage, _, _, fakeClient := newStorage(t)
|
storage, _, _, fakeClient := newStorage(t)
|
||||||
test := resttest.New(t, storage, fakeClient.SetError)
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
pod := validNewPod()
|
test.TestGet(validNewPod())
|
||||||
test.TestGet(pod)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEtcdList(t *testing.T) {
|
func TestEtcdList(t *testing.T) {
|
||||||
storage, _, _, fakeClient := newStorage(t)
|
storage, _, _, fakeClient := newStorage(t)
|
||||||
test := resttest.New(t, storage, fakeClient.SetError)
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
key := etcdtest.AddPrefix(storage.Etcd.KeyRootFunc(test.TestContext()))
|
key := etcdtest.AddPrefix(storage.Etcd.KeyRootFunc(test.TestContext()))
|
||||||
pod := validNewPod()
|
|
||||||
test.TestList(
|
test.TestList(
|
||||||
pod,
|
validNewPod(),
|
||||||
func(objects []runtime.Object) []runtime.Object {
|
func(objects []runtime.Object) []runtime.Object {
|
||||||
return registrytest.SetObjectsForKey(fakeClient, key, objects)
|
return registrytest.SetObjectsForKey(fakeClient, key, objects)
|
||||||
},
|
},
|
||||||
@ -395,6 +392,38 @@ func TestEtcdList(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEtcdWatch(t *testing.T) {
|
||||||
|
storage, _, _, fakeClient := newStorage(t)
|
||||||
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
|
test.TestWatch(
|
||||||
|
validNewPod(),
|
||||||
|
func() {
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
},
|
||||||
|
func(err error) {
|
||||||
|
fakeClient.WatchInjectError <- err
|
||||||
|
},
|
||||||
|
func(obj runtime.Object, action string) error {
|
||||||
|
return registrytest.EmitObject(fakeClient, obj, action)
|
||||||
|
},
|
||||||
|
// matching labels
|
||||||
|
[]labels.Set{},
|
||||||
|
// not matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"foo": "bar"},
|
||||||
|
},
|
||||||
|
// matching fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "foo"},
|
||||||
|
},
|
||||||
|
// not matchin fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "bar"},
|
||||||
|
},
|
||||||
|
registrytest.WatchActions,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func TestEtcdCreate(t *testing.T) {
|
func TestEtcdCreate(t *testing.T) {
|
||||||
storage, bindingStorage, _, fakeClient := newStorage(t)
|
storage, bindingStorage, _, fakeClient := newStorage(t)
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
@ -893,107 +922,3 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
|
|||||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEtcdWatchPods(t *testing.T) {
|
|
||||||
storage, _, _, fakeClient := newStorage(t)
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.Everything(),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
fakeClient.WatchInjectError <- nil
|
|
||||||
if _, ok := <-watching.ResultChan(); ok {
|
|
||||||
t.Errorf("watching channel should be closed")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchPodsMatch(t *testing.T) {
|
|
||||||
storage, _, _, fakeClient := newStorage(t)
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
pod := &api.Pod{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "foo",
|
|
||||||
Namespace: "default",
|
|
||||||
Labels: map[string]string{
|
|
||||||
"name": "foo",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
podBytes, _ := testapi.Codec().Encode(pod)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(podBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
t.Error("unexpected timeout from result channel")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchPodsNotMatch(t *testing.T) {
|
|
||||||
storage, _, _, fakeClient := newStorage(t)
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
pod := &api.Pod{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "bar",
|
|
||||||
Labels: map[string]string{
|
|
||||||
"name": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
podBytes, _ := testapi.Codec().Encode(pod)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(podBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-watching.ResultChan():
|
|
||||||
t.Error("unexpected result from result channel")
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
// expected case
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -37,9 +37,31 @@ func NewEtcdStorage(t *testing.T) (storage.Interface, *tools.FakeEtcdClient) {
|
|||||||
return etcdStorage, fakeClient
|
return etcdStorage, fakeClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var WatchActions = []string{etcdstorage.EtcdCreate, etcdstorage.EtcdSet, etcdstorage.EtcdCAS, etcdstorage.EtcdDelete}
|
||||||
|
|
||||||
type keyFunc func(api.Context, string) (string, error)
|
type keyFunc func(api.Context, string) (string, error)
|
||||||
type newFunc func() runtime.Object
|
type newFunc func() runtime.Object
|
||||||
|
|
||||||
|
func EmitObject(fakeClient *tools.FakeEtcdClient, obj runtime.Object, action string) error {
|
||||||
|
encoded, err := testapi.Codec().Encode(obj)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
node := &etcd.Node{
|
||||||
|
Value: string(encoded),
|
||||||
|
}
|
||||||
|
var prevNode *etcd.Node = nil
|
||||||
|
if action == etcdstorage.EtcdDelete {
|
||||||
|
prevNode = node
|
||||||
|
}
|
||||||
|
fakeClient.WatchResponse <- &etcd.Response{
|
||||||
|
Action: action,
|
||||||
|
Node: node,
|
||||||
|
PrevNode: prevNode,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func GetObject(fakeClient *tools.FakeEtcdClient, keyFn keyFunc, newFn newFunc, ctx api.Context, obj runtime.Object) (runtime.Object, error) {
|
func GetObject(fakeClient *tools.FakeEtcdClient, keyFn keyFunc, newFn newFunc, ctx api.Context, obj runtime.Object) (runtime.Object, error) {
|
||||||
meta, err := api.ObjectMetaFor(obj)
|
meta, err := api.ObjectMetaFor(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -19,7 +19,6 @@ package etcd
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/resource"
|
"k8s.io/kubernetes/pkg/api/resource"
|
||||||
@ -181,17 +180,15 @@ func TestDeleteResourceQuota(t *testing.T) {
|
|||||||
func TestEtcdGet(t *testing.T) {
|
func TestEtcdGet(t *testing.T) {
|
||||||
storage, _, fakeClient := newStorage(t)
|
storage, _, fakeClient := newStorage(t)
|
||||||
test := resttest.New(t, storage, fakeClient.SetError)
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
resourcequota := validNewResourceQuota()
|
test.TestGet(validNewResourceQuota())
|
||||||
test.TestGet(resourcequota)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEtcdList(t *testing.T) {
|
func TestEtcdList(t *testing.T) {
|
||||||
storage, _, fakeClient := newStorage(t)
|
storage, _, fakeClient := newStorage(t)
|
||||||
test := resttest.New(t, storage, fakeClient.SetError)
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
key := etcdtest.AddPrefix(storage.Etcd.KeyRootFunc(test.TestContext()))
|
key := etcdtest.AddPrefix(storage.Etcd.KeyRootFunc(test.TestContext()))
|
||||||
resourcequota := validNewResourceQuota()
|
|
||||||
test.TestList(
|
test.TestList(
|
||||||
resourcequota,
|
validNewResourceQuota(),
|
||||||
func(objects []runtime.Object) []runtime.Object {
|
func(objects []runtime.Object) []runtime.Object {
|
||||||
return registrytest.SetObjectsForKey(fakeClient, key, objects)
|
return registrytest.SetObjectsForKey(fakeClient, key, objects)
|
||||||
},
|
},
|
||||||
@ -200,6 +197,38 @@ func TestEtcdList(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEtcdWatch(t *testing.T) {
|
||||||
|
storage, _, fakeClient := newStorage(t)
|
||||||
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
|
test.TestWatch(
|
||||||
|
validNewResourceQuota(),
|
||||||
|
func() {
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
},
|
||||||
|
func(err error) {
|
||||||
|
fakeClient.WatchInjectError <- err
|
||||||
|
},
|
||||||
|
func(obj runtime.Object, action string) error {
|
||||||
|
return registrytest.EmitObject(fakeClient, obj, action)
|
||||||
|
},
|
||||||
|
// matching labels
|
||||||
|
[]labels.Set{},
|
||||||
|
// not matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"foo": "bar"},
|
||||||
|
},
|
||||||
|
// matching fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "foo"},
|
||||||
|
},
|
||||||
|
// not matchin fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "bar"},
|
||||||
|
},
|
||||||
|
registrytest.WatchActions,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func TestEtcdUpdateStatus(t *testing.T) {
|
func TestEtcdUpdateStatus(t *testing.T) {
|
||||||
storage, status, fakeClient := newStorage(t)
|
storage, status, fakeClient := newStorage(t)
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
@ -249,106 +278,3 @@ func TestEtcdUpdateStatus(t *testing.T) {
|
|||||||
t.Errorf("unexpected object: %s", util.ObjectDiff(&expected, rqOut))
|
t.Errorf("unexpected object: %s", util.ObjectDiff(&expected, rqOut))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEtcdWatchResourceQuotas(t *testing.T) {
|
|
||||||
storage, _, fakeClient := newStorage(t)
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.Everything(),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
fakeClient.WatchInjectError <- nil
|
|
||||||
if _, ok := <-watching.ResultChan(); ok {
|
|
||||||
t.Errorf("watching channel should be closed")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchResourceQuotasMatch(t *testing.T) {
|
|
||||||
storage, _, fakeClient := newStorage(t)
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
resourcequota := &api.ResourceQuota{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "foo",
|
|
||||||
Labels: map[string]string{
|
|
||||||
"name": "foo",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
resourcequotaBytes, _ := testapi.Codec().Encode(resourcequota)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(resourcequotaBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case _, ok := <-watching.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("watching channel should be open")
|
|
||||||
}
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
t.Error("unexpected timeout from result channel")
|
|
||||||
}
|
|
||||||
watching.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEtcdWatchResourceQuotasNotMatch(t *testing.T) {
|
|
||||||
storage, _, fakeClient := newStorage(t)
|
|
||||||
ctx := api.NewDefaultContext()
|
|
||||||
watching, err := storage.Watch(ctx,
|
|
||||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
|
||||||
fields.Everything(),
|
|
||||||
"1",
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeClient.WaitForWatchCompletion()
|
|
||||||
|
|
||||||
resourcequota := &api.ResourceQuota{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "bar",
|
|
||||||
Labels: map[string]string{
|
|
||||||
"name": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
resourcequotaBytes, _ := testapi.Codec().Encode(resourcequota)
|
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
|
||||||
Action: "create",
|
|
||||||
Node: &etcd.Node{
|
|
||||||
Value: string(resourcequotaBytes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-watching.ResultChan():
|
|
||||||
t.Error("unexpected result from result channel")
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
// expected case
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -106,9 +106,10 @@ func MatchResourceQuota(label labels.Selector, field fields.Selector) generic.Ma
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ResourceQuotaToSelectableFields returns a label set that represents the object
|
// ResourceQuotaToSelectableFields returns a label set that represents the object
|
||||||
// TODO: fields are not labels, and the validation rules for them do not apply.
|
|
||||||
func ResourceQuotaToSelectableFields(resourcequota *api.ResourceQuota) labels.Set {
|
func ResourceQuotaToSelectableFields(resourcequota *api.ResourceQuota) labels.Set {
|
||||||
return labels.Set{
|
return labels.Set{
|
||||||
|
"metadata.name": resourcequota.Name,
|
||||||
|
// Having "name" is a bug, but it must be supported for v1 API backward compatibility.
|
||||||
"name": resourcequota.Name,
|
"name": resourcequota.Name,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/rest/resttest"
|
"k8s.io/kubernetes/pkg/api/rest/resttest"
|
||||||
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/tools"
|
"k8s.io/kubernetes/pkg/tools"
|
||||||
@ -119,3 +121,35 @@ func TestUpdate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatch(t *testing.T) {
|
||||||
|
storage, fakeClient := newStorage(t)
|
||||||
|
test := resttest.New(t, storage, fakeClient.SetError)
|
||||||
|
test.TestWatch(
|
||||||
|
validService(),
|
||||||
|
func() {
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
},
|
||||||
|
func(err error) {
|
||||||
|
fakeClient.WatchInjectError <- err
|
||||||
|
},
|
||||||
|
func(obj runtime.Object, action string) error {
|
||||||
|
return registrytest.EmitObject(fakeClient, obj, action)
|
||||||
|
},
|
||||||
|
// matching labels
|
||||||
|
[]labels.Set{},
|
||||||
|
// not matching labels
|
||||||
|
[]labels.Set{
|
||||||
|
{"foo": "bar"},
|
||||||
|
},
|
||||||
|
// matching fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "foo"},
|
||||||
|
},
|
||||||
|
// not matchin fields
|
||||||
|
[]fields.Set{
|
||||||
|
{"metadata.name": "bar"},
|
||||||
|
},
|
||||||
|
registrytest.WatchActions,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user