Federated Namespace Controller test

This commit is contained in:
Marcin Wielgus 2016-08-12 17:32:14 +02:00
parent e428ffe879
commit b2c192bcc9
2 changed files with 199 additions and 22 deletions

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
package namespace
import (
"reflect"
@ -35,11 +35,6 @@ import (
)
const (
NamespaceReviewDelay = time.Second * 10
ClusterAvailableDelay = time.Second * 20
SmallDelay = time.Second * 3
UpdateTimeout = time.Second * 30
allClustersKey = "ALL_CLUSTERS"
)
@ -66,6 +61,11 @@ type NamespaceController struct {
federatedApiClient federation_release_1_4.Interface
stopChan chan struct{}
namespaceReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
updateTimeout time.Duration
}
// A structure passed by delying deliver. It contains a namespace that should be reconciled and
@ -79,8 +79,12 @@ type namespaceItem struct {
// NewNamespaceController returns a new namespace controller
func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController {
nc := &NamespaceController{
federatedApiClient: client,
stopChan: make(chan struct{}),
federatedApiClient: client,
stopChan: make(chan struct{}),
namespaceReviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
}
// Build delivereres for triggering reconcilations.
@ -97,7 +101,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
return client.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
&api_v1.Namespace{},
controller.NoResyncPeriodFunc(),
util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, 0) }))
@ -114,12 +118,12 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
return targetClient.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
&api_v1.Namespace{},
controller.NoResyncPeriodFunc(),
// Trigger reconcilation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some namespace opration suceeded.
util.NewTriggerOnMetaAndSpecChangesPreproc(
func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, NamespaceReviewDelay, 0) },
func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, 0) },
func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) },
))
},
@ -127,7 +131,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) {
// When new cluster becomes available process all the namespaces again.
nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ClusterAvailableDelay))
nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay))
},
},
)
@ -153,7 +157,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
}
func (nc *NamespaceController) Start() {
nc.namespaceInformerController.Run(nc.stopChan)
go nc.namespaceInformerController.Run(nc.stopChan)
nc.namespaceFederatedInformer.Start()
nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
ni := item.Value.(*namespaceItem)
@ -170,7 +174,7 @@ func (nc *NamespaceController) Stop() {
}
func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, trial int64) {
namespace := obj.(*api.Namespace)
namespace := obj.(*api_v1.Namespace)
nc.deliverNamespace(namespace.Name, delay, trial)
}
@ -199,11 +203,11 @@ func (nc *NamespaceController) isSynced() bool {
// The function triggers reconcilation of all federated namespaces.
func (nc *NamespaceController) reconcileNamespacesOnClusterChange() {
if !nc.isSynced() {
nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ClusterAvailableDelay))
nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay))
}
for _, obj := range nc.namespaceInformerStore.List() {
namespace := obj.(*api_v1.Namespace)
nc.deliverNamespace(namespace.Name, SmallDelay, 0)
nc.deliverNamespace(namespace.Name, nc.smallDelay, 0)
}
}
@ -216,7 +220,7 @@ func backoff(trial int64) time.Duration {
func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) {
if !nc.isSynced() {
nc.deliverNamespace(namespace, ClusterAvailableDelay, trial)
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial)
}
baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace)
@ -225,12 +229,12 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1)
return
}
if !exist {
// Not federated namespace, ignoring.
return
}
baseNamespace := baseNamespaceObj.(*api_v1.Namespace)
if baseNamespace.Status.Phase == api_v1.NamespaceTerminating {
// TODO: What about namespaces in subclusters ???
err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{})
@ -244,12 +248,11 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
clusters, err := nc.namespaceFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
nc.deliverNamespace(namespace, ClusterAvailableDelay, trial)
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial)
return
}
operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters {
clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace)
if err != nil {
@ -270,6 +273,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
})
} else {
clusterNamespace := clusterNamespaceObj.(*api_v1.Namespace)
// Update existing namespace, if needed.
if !reflect.DeepEqual(desiredNamespace.ObjectMeta, clusterNamespace.ObjectMeta) ||
!reflect.DeepEqual(desiredNamespace.Spec, clusterNamespace.Spec) {
@ -285,7 +289,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
// Everything is in order
return
}
err = nc.federatedUpdater.Update(operations, UpdateTimeout)
err = nc.federatedUpdater.Update(operations, nc.updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", namespace, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1)
@ -293,5 +297,5 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
}
// Evertyhing is in order but lets be double sure
nc.deliverNamespace(namespace, NamespaceReviewDelay, 0)
nc.deliverNamespace(namespace, nc.namespaceReviewDelay, 0)
}

View File

@ -0,0 +1,173 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"fmt"
"testing"
"time"
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/stretchr/testify/assert"
)
func TestNamespaceController(t *testing.T) {
cluster1 := mkCluster("cluster1", api_v1.ConditionTrue)
cluster2 := mkCluster("cluster2", api_v1.ConditionTrue)
fakeClient := &fake_federation_release_1_4.Clientset{}
RegisterList("clusters", fakeClient, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}})
RegisterList("namespaces", fakeClient, &api_v1.NamespaceList{Items: []api_v1.Namespace{}})
namespaceWatch := RegisterWatch("namespaces", fakeClient)
clusterWatch := RegisterWatch("clusters", fakeClient)
cluster1Client := &fake_federation_release_1_4.Clientset{}
cluster1Watch := RegisterWatch("namespaces", cluster1Client)
RegisterList("namespaces", cluster1Client, &api_v1.NamespaceList{Items: []api_v1.Namespace{}})
cluster1CreateChan := RegisterCopyOnCreate("namespaces", cluster1Client, cluster1Watch)
cluster1UpdateChan := RegisterCopyOnUpdate("namespaces", cluster1Client, cluster1Watch)
cluster2Client := &fake_federation_release_1_4.Clientset{}
cluster2Watch := RegisterWatch("namespaces", cluster2Client)
RegisterList("namespaces", cluster2Client, &api_v1.NamespaceList{Items: []api_v1.Namespace{}})
cluster2CreateChan := RegisterCopyOnCreate("namespaces", cluster2Client, cluster2Watch)
namespaceController := NewNamespaceController(fakeClient)
informer := toFederatedInformerForTestOnly(namespaceController.namespaceFederatedInformer)
informer.SetClientFactory(func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
case cluster2.Name:
return cluster2Client, nil
default:
return nil, fmt.Errorf("Unknown cluster")
}
})
namespaceController.clusterAvailableDelay = 1000 * time.Millisecond
namespaceController.namespaceReviewDelay = 50 * time.Millisecond
namespaceController.smallDelay = 20 * time.Millisecond
namespaceController.updateTimeout = 5 * time.Second
namespaceController.Start()
ns1 := api_v1.Namespace{
ObjectMeta: api_v1.ObjectMeta{
Name: "test-namespace",
},
}
// Test add federated namespace.
namespaceWatch.Add(&ns1)
createdNamespace := GetNamespaceFromChan(cluster1CreateChan)
assert.NotNil(t, createdNamespace)
assert.Equal(t, ns1.Name, createdNamespace.Name)
// Test update federated namespace.
ns1.Annotations = map[string]string{
"A": "B",
}
namespaceWatch.Modify(&ns1)
updatedNamespace := GetNamespaceFromChan(cluster1UpdateChan)
assert.NotNil(t, updatedNamespace)
assert.Equal(t, ns1.Name, updatedNamespace.Name)
// assert.Contains(t, updatedNamespace.Annotations, "A")
// Test add cluster
clusterWatch.Add(cluster2)
createdNamespace2 := GetNamespaceFromChan(cluster2CreateChan)
assert.NotNil(t, createdNamespace2)
assert.Equal(t, ns1.Name, createdNamespace2.Name)
// assert.Contains(t, createdNamespace2.Annotations, "A")
namespaceController.Stop()
}
func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly {
inter := informer.(interface{})
return inter.(util.FederatedInformerForTestOnly)
}
func mkCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster {
return &federation_api.Cluster{
ObjectMeta: api_v1.ObjectMeta{
Name: name,
},
Status: federation_api.ClusterStatus{
Conditions: []federation_api.ClusterCondition{
{Type: federation_api.ClusterReady, Status: readyStatus},
},
},
}
}
func RegisterWatch(resource string, client *fake_federation_release_1_4.Clientset) *watch.FakeWatcher {
watcher := watch.NewFake()
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil })
return watcher
}
func RegisterList(resource string, client *fake_federation_release_1_4.Clientset, obj runtime.Object) {
client.AddReactor("list", resource, func(action core.Action) (bool, runtime.Object, error) {
return true, obj, nil
})
}
func RegisterCopyOnCreate(resource string, client *fake_federation_release_1_4.Clientset, watcher *watch.FakeWatcher) chan runtime.Object {
objChan := make(chan runtime.Object, 100)
client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
obj := createAction.GetObject()
go func() {
watcher.Add(obj)
objChan <- obj
}()
return true, obj, nil
})
return objChan
}
func RegisterCopyOnUpdate(resource string, client *fake_federation_release_1_4.Clientset, watcher *watch.FakeWatcher) chan runtime.Object {
objChan := make(chan runtime.Object, 100)
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
updateAction := action.(core.UpdateAction)
obj := updateAction.GetObject()
go func() {
watcher.Modify(obj)
objChan <- obj
}()
return true, obj, nil
})
return objChan
}
func GetNamespaceFromChan(c chan runtime.Object) *api_v1.Namespace {
select {
case obj := <-c:
namespace := obj.(*api_v1.Namespace)
return namespace
case <-time.After(time.Minute):
return nil
}
}