From b2c192bcc9645de0915c4f9c49f24542a9ec99a6 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 12 Aug 2016 17:32:14 +0200 Subject: [PATCH] Federated Namespace Controller test --- .../namespace/namespace_controller.go | 48 ++--- .../namespace/namespace_controller_test.go | 173 ++++++++++++++++++ 2 files changed, 199 insertions(+), 22 deletions(-) create mode 100644 federation/pkg/federation-controller/namespace/namespace_controller_test.go diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index e430caad2b4..3fcfe2c4fe4 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cluster +package namespace import ( "reflect" @@ -35,11 +35,6 @@ import ( ) const ( - NamespaceReviewDelay = time.Second * 10 - ClusterAvailableDelay = time.Second * 20 - SmallDelay = time.Second * 3 - UpdateTimeout = time.Second * 30 - allClustersKey = "ALL_CLUSTERS" ) @@ -66,6 +61,11 @@ type NamespaceController struct { federatedApiClient federation_release_1_4.Interface stopChan chan struct{} + + namespaceReviewDelay time.Duration + clusterAvailableDelay time.Duration + smallDelay time.Duration + updateTimeout time.Duration } // A structure passed by delying deliver. It contains a namespace that should be reconciled and @@ -79,8 +79,12 @@ type namespaceItem struct { // NewNamespaceController returns a new namespace controller func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController { nc := &NamespaceController{ - federatedApiClient: client, - stopChan: make(chan struct{}), + federatedApiClient: client, + stopChan: make(chan struct{}), + namespaceReviewDelay: time.Second * 10, + clusterAvailableDelay: time.Second * 20, + smallDelay: time.Second * 3, + updateTimeout: time.Second * 30, } // Build delivereres for triggering reconcilations. @@ -97,7 +101,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC return client.Core().Namespaces().Watch(options) }, }, - &api.Namespace{}, + &api_v1.Namespace{}, controller.NoResyncPeriodFunc(), util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, 0) })) @@ -114,12 +118,12 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC return targetClient.Core().Namespaces().Watch(options) }, }, - &api.Namespace{}, + &api_v1.Namespace{}, controller.NoResyncPeriodFunc(), // Trigger reconcilation whenever something in federated cluster is changed. In most cases it // would be just confirmation that some namespace opration suceeded. util.NewTriggerOnMetaAndSpecChangesPreproc( - func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, NamespaceReviewDelay, 0) }, + func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, 0) }, func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) }, )) }, @@ -127,7 +131,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *federation_api.Cluster) { // When new cluster becomes available process all the namespaces again. - nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ClusterAvailableDelay)) + nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) }, }, ) @@ -153,7 +157,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC } func (nc *NamespaceController) Start() { - nc.namespaceInformerController.Run(nc.stopChan) + go nc.namespaceInformerController.Run(nc.stopChan) nc.namespaceFederatedInformer.Start() nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { ni := item.Value.(*namespaceItem) @@ -170,7 +174,7 @@ func (nc *NamespaceController) Stop() { } func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, trial int64) { - namespace := obj.(*api.Namespace) + namespace := obj.(*api_v1.Namespace) nc.deliverNamespace(namespace.Name, delay, trial) } @@ -199,11 +203,11 @@ func (nc *NamespaceController) isSynced() bool { // The function triggers reconcilation of all federated namespaces. func (nc *NamespaceController) reconcileNamespacesOnClusterChange() { if !nc.isSynced() { - nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ClusterAvailableDelay)) + nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) } for _, obj := range nc.namespaceInformerStore.List() { namespace := obj.(*api_v1.Namespace) - nc.deliverNamespace(namespace.Name, SmallDelay, 0) + nc.deliverNamespace(namespace.Name, nc.smallDelay, 0) } } @@ -216,7 +220,7 @@ func backoff(trial int64) time.Duration { func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) { if !nc.isSynced() { - nc.deliverNamespace(namespace, ClusterAvailableDelay, trial) + nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) } baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace) @@ -225,12 +229,12 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) nc.deliverNamespace(namespace, backoff(trial+1), trial+1) return } + if !exist { // Not federated namespace, ignoring. return } baseNamespace := baseNamespaceObj.(*api_v1.Namespace) - if baseNamespace.Status.Phase == api_v1.NamespaceTerminating { // TODO: What about namespaces in subclusters ??? err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{}) @@ -244,12 +248,11 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get cluster list: %v", err) - nc.deliverNamespace(namespace, ClusterAvailableDelay, trial) + nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) return } operations := make([]util.FederatedOperation, 0) - for _, cluster := range clusters { clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace) if err != nil { @@ -270,6 +273,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) }) } else { clusterNamespace := clusterNamespaceObj.(*api_v1.Namespace) + // Update existing namespace, if needed. if !reflect.DeepEqual(desiredNamespace.ObjectMeta, clusterNamespace.ObjectMeta) || !reflect.DeepEqual(desiredNamespace.Spec, clusterNamespace.Spec) { @@ -285,7 +289,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) // Everything is in order return } - err = nc.federatedUpdater.Update(operations, UpdateTimeout) + err = nc.federatedUpdater.Update(operations, nc.updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", namespace, err) nc.deliverNamespace(namespace, backoff(trial+1), trial+1) @@ -293,5 +297,5 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) } // Evertyhing is in order but lets be double sure - nc.deliverNamespace(namespace, NamespaceReviewDelay, 0) + nc.deliverNamespace(namespace, nc.namespaceReviewDelay, 0) } diff --git a/federation/pkg/federation-controller/namespace/namespace_controller_test.go b/federation/pkg/federation-controller/namespace/namespace_controller_test.go new file mode 100644 index 00000000000..fb0be1a5574 --- /dev/null +++ b/federation/pkg/federation-controller/namespace/namespace_controller_test.go @@ -0,0 +1,173 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package namespace + +import ( + "fmt" + "testing" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + + "github.com/stretchr/testify/assert" +) + +func TestNamespaceController(t *testing.T) { + cluster1 := mkCluster("cluster1", api_v1.ConditionTrue) + cluster2 := mkCluster("cluster2", api_v1.ConditionTrue) + + fakeClient := &fake_federation_release_1_4.Clientset{} + RegisterList("clusters", fakeClient, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}}) + RegisterList("namespaces", fakeClient, &api_v1.NamespaceList{Items: []api_v1.Namespace{}}) + namespaceWatch := RegisterWatch("namespaces", fakeClient) + clusterWatch := RegisterWatch("clusters", fakeClient) + + cluster1Client := &fake_federation_release_1_4.Clientset{} + cluster1Watch := RegisterWatch("namespaces", cluster1Client) + RegisterList("namespaces", cluster1Client, &api_v1.NamespaceList{Items: []api_v1.Namespace{}}) + cluster1CreateChan := RegisterCopyOnCreate("namespaces", cluster1Client, cluster1Watch) + cluster1UpdateChan := RegisterCopyOnUpdate("namespaces", cluster1Client, cluster1Watch) + + cluster2Client := &fake_federation_release_1_4.Clientset{} + cluster2Watch := RegisterWatch("namespaces", cluster2Client) + RegisterList("namespaces", cluster2Client, &api_v1.NamespaceList{Items: []api_v1.Namespace{}}) + cluster2CreateChan := RegisterCopyOnCreate("namespaces", cluster2Client, cluster2Watch) + + namespaceController := NewNamespaceController(fakeClient) + informer := toFederatedInformerForTestOnly(namespaceController.namespaceFederatedInformer) + informer.SetClientFactory(func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) { + switch cluster.Name { + case cluster1.Name: + return cluster1Client, nil + case cluster2.Name: + return cluster2Client, nil + default: + return nil, fmt.Errorf("Unknown cluster") + } + }) + namespaceController.clusterAvailableDelay = 1000 * time.Millisecond + namespaceController.namespaceReviewDelay = 50 * time.Millisecond + namespaceController.smallDelay = 20 * time.Millisecond + namespaceController.updateTimeout = 5 * time.Second + namespaceController.Start() + + ns1 := api_v1.Namespace{ + ObjectMeta: api_v1.ObjectMeta{ + Name: "test-namespace", + }, + } + + // Test add federated namespace. + namespaceWatch.Add(&ns1) + createdNamespace := GetNamespaceFromChan(cluster1CreateChan) + assert.NotNil(t, createdNamespace) + assert.Equal(t, ns1.Name, createdNamespace.Name) + + // Test update federated namespace. + ns1.Annotations = map[string]string{ + "A": "B", + } + namespaceWatch.Modify(&ns1) + updatedNamespace := GetNamespaceFromChan(cluster1UpdateChan) + assert.NotNil(t, updatedNamespace) + assert.Equal(t, ns1.Name, updatedNamespace.Name) + // assert.Contains(t, updatedNamespace.Annotations, "A") + + // Test add cluster + clusterWatch.Add(cluster2) + createdNamespace2 := GetNamespaceFromChan(cluster2CreateChan) + assert.NotNil(t, createdNamespace2) + assert.Equal(t, ns1.Name, createdNamespace2.Name) + // assert.Contains(t, createdNamespace2.Annotations, "A") + + namespaceController.Stop() +} + +func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly { + inter := informer.(interface{}) + return inter.(util.FederatedInformerForTestOnly) +} + +func mkCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster { + return &federation_api.Cluster{ + ObjectMeta: api_v1.ObjectMeta{ + Name: name, + }, + Status: federation_api.ClusterStatus{ + Conditions: []federation_api.ClusterCondition{ + {Type: federation_api.ClusterReady, Status: readyStatus}, + }, + }, + } +} + +func RegisterWatch(resource string, client *fake_federation_release_1_4.Clientset) *watch.FakeWatcher { + watcher := watch.NewFake() + client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil }) + return watcher +} + +func RegisterList(resource string, client *fake_federation_release_1_4.Clientset, obj runtime.Object) { + client.AddReactor("list", resource, func(action core.Action) (bool, runtime.Object, error) { + return true, obj, nil + }) +} + +func RegisterCopyOnCreate(resource string, client *fake_federation_release_1_4.Clientset, watcher *watch.FakeWatcher) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + obj := createAction.GetObject() + go func() { + watcher.Add(obj) + objChan <- obj + }() + return true, obj, nil + }) + return objChan +} + +func RegisterCopyOnUpdate(resource string, client *fake_federation_release_1_4.Clientset, watcher *watch.FakeWatcher) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { + updateAction := action.(core.UpdateAction) + obj := updateAction.GetObject() + go func() { + watcher.Modify(obj) + objChan <- obj + }() + return true, obj, nil + }) + return objChan +} + +func GetNamespaceFromChan(c chan runtime.Object) *api_v1.Namespace { + select { + case obj := <-c: + namespace := obj.(*api_v1.Namespace) + return namespace + case <-time.After(time.Minute): + return nil + } +}