From e428ffe879989120721a007bfdd822bad7c33a68 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Sun, 7 Aug 2016 14:26:20 +0200 Subject: [PATCH 1/4] Federated namespace controller --- .../namespace/namespace_controller.go | 297 ++++++++++++++++++ 1 file changed, 297 insertions(+) create mode 100644 federation/pkg/federation-controller/namespace/namespace_controller.go diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go new file mode 100644 index 00000000000..e430caad2b4 --- /dev/null +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -0,0 +1,297 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "reflect" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +const ( + NamespaceReviewDelay = time.Second * 10 + ClusterAvailableDelay = time.Second * 20 + SmallDelay = time.Second * 3 + UpdateTimeout = time.Second * 30 + + allClustersKey = "ALL_CLUSTERS" +) + +type NamespaceController struct { + // For triggering single namespace reconcilation. This is used when there is an + // add/update/delete operation on a namespace in either federated API server or + // in some member of the federation. + namespaceDeliverer *util.DelayingDeliverer + + // For triggering all namespaces reconcilation. This is used when + // a new cluster becomes available. + clusterDeliverer *util.DelayingDeliverer + + // Contains namespaces present in members of federation. + namespaceFederatedInformer util.FederatedInformer + // For updating members of federation. + federatedUpdater util.FederatedUpdater + // Definitions of namespaces that should be federated. + namespaceInformerStore cache.Store + // Informer controller for namespaces that should be federated. + namespaceInformerController framework.ControllerInterface + + // Client to federated api server. + federatedApiClient federation_release_1_4.Interface + + stopChan chan struct{} +} + +// A structure passed by delying deliver. It contains a namespace that should be reconciled and +// the number of trials that were made previously and ended up in some kind of namespace-related +// error (like failure to create). +type namespaceItem struct { + namespace string + trial int64 +} + +// NewNamespaceController returns a new namespace controller +func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController { + nc := &NamespaceController{ + federatedApiClient: client, + stopChan: make(chan struct{}), + } + + // Build delivereres for triggering reconcilations. + nc.namespaceDeliverer = util.NewDelayingDeliverer() + nc.clusterDeliverer = util.NewDelayingDeliverer() + + // Start informer in federated API servers on namespaces that should be federated. + nc.namespaceInformerStore, nc.namespaceInformerController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return client.Core().Namespaces().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Namespaces().Watch(options) + }, + }, + &api.Namespace{}, + controller.NoResyncPeriodFunc(), + util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, 0) })) + + // Federated informer on namespaces in members of federation. + nc.namespaceFederatedInformer = util.NewFederatedInformer( + client, + func(cluster *federation_api.Cluster, targetClient federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return targetClient.Core().Namespaces().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return targetClient.Core().Namespaces().Watch(options) + }, + }, + &api.Namespace{}, + controller.NoResyncPeriodFunc(), + // Trigger reconcilation whenever something in federated cluster is changed. In most cases it + // would be just confirmation that some namespace opration suceeded. + util.NewTriggerOnMetaAndSpecChangesPreproc( + func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, NamespaceReviewDelay, 0) }, + func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) }, + )) + }, + + &util.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *federation_api.Cluster) { + // When new cluster becomes available process all the namespaces again. + nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ClusterAvailableDelay)) + }, + }, + ) + + // Federated updeater along with Create/Update/Delete operations. + nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, + func(client federation_release_1_4.Interface, obj pkg_runtime.Object) error { + namespace := obj.(*api_v1.Namespace) + _, err := client.Core().Namespaces().Create(namespace) + return err + }, + func(client federation_release_1_4.Interface, obj pkg_runtime.Object) error { + namespace := obj.(*api_v1.Namespace) + _, err := client.Core().Namespaces().Update(namespace) + return err + }, + func(client federation_release_1_4.Interface, obj pkg_runtime.Object) error { + namespace := obj.(*api_v1.Namespace) + err := client.Core().Namespaces().Delete(namespace.Name, &api.DeleteOptions{}) + return err + }) + return nc +} + +func (nc *NamespaceController) Start() { + nc.namespaceInformerController.Run(nc.stopChan) + nc.namespaceFederatedInformer.Start() + nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + ni := item.Value.(*namespaceItem) + nc.reconcileNamespace(ni.namespace, ni.trial) + }) + nc.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { + nc.reconcileNamespacesOnClusterChange() + }) +} + +func (nc *NamespaceController) Stop() { + nc.namespaceFederatedInformer.Stop() + close(nc.stopChan) +} + +func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, trial int64) { + namespace := obj.(*api.Namespace) + nc.deliverNamespace(namespace.Name, delay, trial) +} + +func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, trial int64) { + nc.namespaceDeliverer.DeliverAfter(namespace, &namespaceItem{namespace: namespace, trial: trial}, delay) +} + +// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet +// synced with the coresponding api server. +func (nc *NamespaceController) isSynced() bool { + if !nc.namespaceFederatedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !nc.namespaceFederatedInformer.GetTargetStore().ClustersSynced(clusters) { + return false + } + return true +} + +// The function triggers reconcilation of all federated namespaces. +func (nc *NamespaceController) reconcileNamespacesOnClusterChange() { + if !nc.isSynced() { + nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ClusterAvailableDelay)) + } + for _, obj := range nc.namespaceInformerStore.List() { + namespace := obj.(*api_v1.Namespace) + nc.deliverNamespace(namespace.Name, SmallDelay, 0) + } +} + +func backoff(trial int64) time.Duration { + if trial > 12 { + return 12 * 5 * time.Second + } + return time.Duration(trial) * 5 * time.Second +} + +func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) { + if !nc.isSynced() { + nc.deliverNamespace(namespace, ClusterAvailableDelay, trial) + } + + baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace) + if err != nil { + glog.Errorf("Failed to query main namespace store for %v: %v", namespace, err) + nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + return + } + if !exist { + // Not federated namespace, ignoring. + return + } + baseNamespace := baseNamespaceObj.(*api_v1.Namespace) + + if baseNamespace.Status.Phase == api_v1.NamespaceTerminating { + // TODO: What about namespaces in subclusters ??? + err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{}) + if err != nil { + glog.Errorf("Failed to delete namespace %s: %v", baseNamespace.Name, err) + nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + } + return + } + + clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get cluster list: %v", err) + nc.deliverNamespace(namespace, ClusterAvailableDelay, trial) + return + } + + operations := make([]util.FederatedOperation, 0) + + for _, cluster := range clusters { + clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace) + if err != nil { + glog.Errorf("Failed to get %s from %s: %v", namespace, cluster.Name, err) + nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + return + } + desiredNamespace := &api_v1.Namespace{ + ObjectMeta: baseNamespace.ObjectMeta, + Spec: baseNamespace.Spec, + } + util.SetClusterName(desiredNamespace, cluster.Name) + + if !found { + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeAdd, + Obj: desiredNamespace, + }) + } else { + clusterNamespace := clusterNamespaceObj.(*api_v1.Namespace) + // Update existing namespace, if needed. + if !reflect.DeepEqual(desiredNamespace.ObjectMeta, clusterNamespace.ObjectMeta) || + !reflect.DeepEqual(desiredNamespace.Spec, clusterNamespace.Spec) { + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeUpdate, + Obj: desiredNamespace, + }) + } + } + } + + if len(operations) == 0 { + // Everything is in order + return + } + err = nc.federatedUpdater.Update(operations, UpdateTimeout) + if err != nil { + glog.Errorf("Failed to execute updates for %s: %v", namespace, err) + nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + return + } + + // Evertyhing is in order but lets be double sure + nc.deliverNamespace(namespace, NamespaceReviewDelay, 0) +} From b2c192bcc9645de0915c4f9c49f24542a9ec99a6 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 12 Aug 2016 17:32:14 +0200 Subject: [PATCH 2/4] Federated Namespace Controller test --- .../namespace/namespace_controller.go | 48 ++--- .../namespace/namespace_controller_test.go | 173 ++++++++++++++++++ 2 files changed, 199 insertions(+), 22 deletions(-) create mode 100644 federation/pkg/federation-controller/namespace/namespace_controller_test.go diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index e430caad2b4..3fcfe2c4fe4 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cluster +package namespace import ( "reflect" @@ -35,11 +35,6 @@ import ( ) const ( - NamespaceReviewDelay = time.Second * 10 - ClusterAvailableDelay = time.Second * 20 - SmallDelay = time.Second * 3 - UpdateTimeout = time.Second * 30 - allClustersKey = "ALL_CLUSTERS" ) @@ -66,6 +61,11 @@ type NamespaceController struct { federatedApiClient federation_release_1_4.Interface stopChan chan struct{} + + namespaceReviewDelay time.Duration + clusterAvailableDelay time.Duration + smallDelay time.Duration + updateTimeout time.Duration } // A structure passed by delying deliver. It contains a namespace that should be reconciled and @@ -79,8 +79,12 @@ type namespaceItem struct { // NewNamespaceController returns a new namespace controller func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController { nc := &NamespaceController{ - federatedApiClient: client, - stopChan: make(chan struct{}), + federatedApiClient: client, + stopChan: make(chan struct{}), + namespaceReviewDelay: time.Second * 10, + clusterAvailableDelay: time.Second * 20, + smallDelay: time.Second * 3, + updateTimeout: time.Second * 30, } // Build delivereres for triggering reconcilations. @@ -97,7 +101,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC return client.Core().Namespaces().Watch(options) }, }, - &api.Namespace{}, + &api_v1.Namespace{}, controller.NoResyncPeriodFunc(), util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, 0) })) @@ -114,12 +118,12 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC return targetClient.Core().Namespaces().Watch(options) }, }, - &api.Namespace{}, + &api_v1.Namespace{}, controller.NoResyncPeriodFunc(), // Trigger reconcilation whenever something in federated cluster is changed. In most cases it // would be just confirmation that some namespace opration suceeded. util.NewTriggerOnMetaAndSpecChangesPreproc( - func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, NamespaceReviewDelay, 0) }, + func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, 0) }, func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) }, )) }, @@ -127,7 +131,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *federation_api.Cluster) { // When new cluster becomes available process all the namespaces again. - nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ClusterAvailableDelay)) + nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) }, }, ) @@ -153,7 +157,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC } func (nc *NamespaceController) Start() { - nc.namespaceInformerController.Run(nc.stopChan) + go nc.namespaceInformerController.Run(nc.stopChan) nc.namespaceFederatedInformer.Start() nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { ni := item.Value.(*namespaceItem) @@ -170,7 +174,7 @@ func (nc *NamespaceController) Stop() { } func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, trial int64) { - namespace := obj.(*api.Namespace) + namespace := obj.(*api_v1.Namespace) nc.deliverNamespace(namespace.Name, delay, trial) } @@ -199,11 +203,11 @@ func (nc *NamespaceController) isSynced() bool { // The function triggers reconcilation of all federated namespaces. func (nc *NamespaceController) reconcileNamespacesOnClusterChange() { if !nc.isSynced() { - nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ClusterAvailableDelay)) + nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) } for _, obj := range nc.namespaceInformerStore.List() { namespace := obj.(*api_v1.Namespace) - nc.deliverNamespace(namespace.Name, SmallDelay, 0) + nc.deliverNamespace(namespace.Name, nc.smallDelay, 0) } } @@ -216,7 +220,7 @@ func backoff(trial int64) time.Duration { func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) { if !nc.isSynced() { - nc.deliverNamespace(namespace, ClusterAvailableDelay, trial) + nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) } baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace) @@ -225,12 +229,12 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) nc.deliverNamespace(namespace, backoff(trial+1), trial+1) return } + if !exist { // Not federated namespace, ignoring. return } baseNamespace := baseNamespaceObj.(*api_v1.Namespace) - if baseNamespace.Status.Phase == api_v1.NamespaceTerminating { // TODO: What about namespaces in subclusters ??? err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{}) @@ -244,12 +248,11 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get cluster list: %v", err) - nc.deliverNamespace(namespace, ClusterAvailableDelay, trial) + nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) return } operations := make([]util.FederatedOperation, 0) - for _, cluster := range clusters { clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace) if err != nil { @@ -270,6 +273,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) }) } else { clusterNamespace := clusterNamespaceObj.(*api_v1.Namespace) + // Update existing namespace, if needed. if !reflect.DeepEqual(desiredNamespace.ObjectMeta, clusterNamespace.ObjectMeta) || !reflect.DeepEqual(desiredNamespace.Spec, clusterNamespace.Spec) { @@ -285,7 +289,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) // Everything is in order return } - err = nc.federatedUpdater.Update(operations, UpdateTimeout) + err = nc.federatedUpdater.Update(operations, nc.updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", namespace, err) nc.deliverNamespace(namespace, backoff(trial+1), trial+1) @@ -293,5 +297,5 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) } // Evertyhing is in order but lets be double sure - nc.deliverNamespace(namespace, NamespaceReviewDelay, 0) + nc.deliverNamespace(namespace, nc.namespaceReviewDelay, 0) } diff --git a/federation/pkg/federation-controller/namespace/namespace_controller_test.go b/federation/pkg/federation-controller/namespace/namespace_controller_test.go new file mode 100644 index 00000000000..fb0be1a5574 --- /dev/null +++ b/federation/pkg/federation-controller/namespace/namespace_controller_test.go @@ -0,0 +1,173 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package namespace + +import ( + "fmt" + "testing" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + + "github.com/stretchr/testify/assert" +) + +func TestNamespaceController(t *testing.T) { + cluster1 := mkCluster("cluster1", api_v1.ConditionTrue) + cluster2 := mkCluster("cluster2", api_v1.ConditionTrue) + + fakeClient := &fake_federation_release_1_4.Clientset{} + RegisterList("clusters", fakeClient, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}}) + RegisterList("namespaces", fakeClient, &api_v1.NamespaceList{Items: []api_v1.Namespace{}}) + namespaceWatch := RegisterWatch("namespaces", fakeClient) + clusterWatch := RegisterWatch("clusters", fakeClient) + + cluster1Client := &fake_federation_release_1_4.Clientset{} + cluster1Watch := RegisterWatch("namespaces", cluster1Client) + RegisterList("namespaces", cluster1Client, &api_v1.NamespaceList{Items: []api_v1.Namespace{}}) + cluster1CreateChan := RegisterCopyOnCreate("namespaces", cluster1Client, cluster1Watch) + cluster1UpdateChan := RegisterCopyOnUpdate("namespaces", cluster1Client, cluster1Watch) + + cluster2Client := &fake_federation_release_1_4.Clientset{} + cluster2Watch := RegisterWatch("namespaces", cluster2Client) + RegisterList("namespaces", cluster2Client, &api_v1.NamespaceList{Items: []api_v1.Namespace{}}) + cluster2CreateChan := RegisterCopyOnCreate("namespaces", cluster2Client, cluster2Watch) + + namespaceController := NewNamespaceController(fakeClient) + informer := toFederatedInformerForTestOnly(namespaceController.namespaceFederatedInformer) + informer.SetClientFactory(func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) { + switch cluster.Name { + case cluster1.Name: + return cluster1Client, nil + case cluster2.Name: + return cluster2Client, nil + default: + return nil, fmt.Errorf("Unknown cluster") + } + }) + namespaceController.clusterAvailableDelay = 1000 * time.Millisecond + namespaceController.namespaceReviewDelay = 50 * time.Millisecond + namespaceController.smallDelay = 20 * time.Millisecond + namespaceController.updateTimeout = 5 * time.Second + namespaceController.Start() + + ns1 := api_v1.Namespace{ + ObjectMeta: api_v1.ObjectMeta{ + Name: "test-namespace", + }, + } + + // Test add federated namespace. + namespaceWatch.Add(&ns1) + createdNamespace := GetNamespaceFromChan(cluster1CreateChan) + assert.NotNil(t, createdNamespace) + assert.Equal(t, ns1.Name, createdNamespace.Name) + + // Test update federated namespace. + ns1.Annotations = map[string]string{ + "A": "B", + } + namespaceWatch.Modify(&ns1) + updatedNamespace := GetNamespaceFromChan(cluster1UpdateChan) + assert.NotNil(t, updatedNamespace) + assert.Equal(t, ns1.Name, updatedNamespace.Name) + // assert.Contains(t, updatedNamespace.Annotations, "A") + + // Test add cluster + clusterWatch.Add(cluster2) + createdNamespace2 := GetNamespaceFromChan(cluster2CreateChan) + assert.NotNil(t, createdNamespace2) + assert.Equal(t, ns1.Name, createdNamespace2.Name) + // assert.Contains(t, createdNamespace2.Annotations, "A") + + namespaceController.Stop() +} + +func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly { + inter := informer.(interface{}) + return inter.(util.FederatedInformerForTestOnly) +} + +func mkCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster { + return &federation_api.Cluster{ + ObjectMeta: api_v1.ObjectMeta{ + Name: name, + }, + Status: federation_api.ClusterStatus{ + Conditions: []federation_api.ClusterCondition{ + {Type: federation_api.ClusterReady, Status: readyStatus}, + }, + }, + } +} + +func RegisterWatch(resource string, client *fake_federation_release_1_4.Clientset) *watch.FakeWatcher { + watcher := watch.NewFake() + client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil }) + return watcher +} + +func RegisterList(resource string, client *fake_federation_release_1_4.Clientset, obj runtime.Object) { + client.AddReactor("list", resource, func(action core.Action) (bool, runtime.Object, error) { + return true, obj, nil + }) +} + +func RegisterCopyOnCreate(resource string, client *fake_federation_release_1_4.Clientset, watcher *watch.FakeWatcher) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + obj := createAction.GetObject() + go func() { + watcher.Add(obj) + objChan <- obj + }() + return true, obj, nil + }) + return objChan +} + +func RegisterCopyOnUpdate(resource string, client *fake_federation_release_1_4.Clientset, watcher *watch.FakeWatcher) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { + updateAction := action.(core.UpdateAction) + obj := updateAction.GetObject() + go func() { + watcher.Modify(obj) + objChan <- obj + }() + return true, obj, nil + }) + return objChan +} + +func GetNamespaceFromChan(c chan runtime.Object) *api_v1.Namespace { + select { + case obj := <-c: + namespace := obj.(*api_v1.Namespace) + return namespace + case <-time.After(time.Minute): + return nil + } +} From 378a49613fb4cc15a4a1502b487b18de1106de3d Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 12 Aug 2016 23:10:38 +0200 Subject: [PATCH 3/4] Enable federated namespace controller --- .../federation-controller-manager/app/controllermanager.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index ffa6742e0f4..2b0f1a23890 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" + namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/pkg/client/restclient" @@ -144,5 +145,10 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil { glog.Errorf("Failed to start service controller: %v", err) } + + nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller")) + namespaceController := namespacecontroller.NewNamespaceController(nsClientset) + namespaceController.Start() + select {} } From c1cbe4771b25debfd09d4381ab3f806fc506454c Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Tue, 16 Aug 2016 16:30:19 +0200 Subject: [PATCH 4/4] Use backoff from util/flowcontroll in federated namespace controller and other minor fixes --- .../app/controllermanager.go | 2 +- .../namespace/namespace_controller.go | 87 ++++++++++--------- .../namespace/namespace_controller_test.go | 8 +- 3 files changed, 50 insertions(+), 47 deletions(-) diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 2b0f1a23890..187abd71596 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -148,7 +148,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller")) namespaceController := namespacecontroller.NewNamespaceController(nsClientset) - namespaceController.Start() + namespaceController.Run(wait.NeverStop) select {} } diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index 3fcfe2c4fe4..ff14d663be9 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -60,7 +61,8 @@ type NamespaceController struct { // Client to federated api server. federatedApiClient federation_release_1_4.Interface - stopChan chan struct{} + // Backoff manager for namespaces + namespaceBackoff *flowcontrol.Backoff namespaceReviewDelay time.Duration clusterAvailableDelay time.Duration @@ -68,23 +70,15 @@ type NamespaceController struct { updateTimeout time.Duration } -// A structure passed by delying deliver. It contains a namespace that should be reconciled and -// the number of trials that were made previously and ended up in some kind of namespace-related -// error (like failure to create). -type namespaceItem struct { - namespace string - trial int64 -} - // NewNamespaceController returns a new namespace controller func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController { nc := &NamespaceController{ federatedApiClient: client, - stopChan: make(chan struct{}), namespaceReviewDelay: time.Second * 10, clusterAvailableDelay: time.Second * 20, smallDelay: time.Second * 3, updateTimeout: time.Second * 30, + namespaceBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), } // Build delivereres for triggering reconcilations. @@ -103,7 +97,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC }, &api_v1.Namespace{}, controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, 0) })) + util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) })) // Federated informer on namespaces in members of federation. nc.namespaceFederatedInformer = util.NewFederatedInformer( @@ -123,7 +117,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC // Trigger reconcilation whenever something in federated cluster is changed. In most cases it // would be just confirmation that some namespace opration suceeded. util.NewTriggerOnMetaAndSpecChangesPreproc( - func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, 0) }, + func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) }, func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) }, )) }, @@ -131,7 +125,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *federation_api.Cluster) { // When new cluster becomes available process all the namespaces again. - nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) + nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay) }, }, ) @@ -156,30 +150,44 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC return nc } -func (nc *NamespaceController) Start() { - go nc.namespaceInformerController.Run(nc.stopChan) +func (nc *NamespaceController) Run(stopChan <-chan struct{}) { + go nc.namespaceInformerController.Run(stopChan) nc.namespaceFederatedInformer.Start() + go func() { + <-stopChan + nc.namespaceFederatedInformer.Stop() + }() nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - ni := item.Value.(*namespaceItem) - nc.reconcileNamespace(ni.namespace, ni.trial) + namespace := item.Value.(string) + nc.reconcileNamespace(namespace) }) nc.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { nc.reconcileNamespacesOnClusterChange() }) + go func() { + select { + case <-time.After(time.Minute): + nc.namespaceBackoff.GC() + case <-stopChan: + return + } + }() } -func (nc *NamespaceController) Stop() { - nc.namespaceFederatedInformer.Stop() - close(nc.stopChan) -} - -func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, trial int64) { +func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, failed bool) { namespace := obj.(*api_v1.Namespace) - nc.deliverNamespace(namespace.Name, delay, trial) + nc.deliverNamespace(namespace.Name, delay, failed) } -func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, trial int64) { - nc.namespaceDeliverer.DeliverAfter(namespace, &namespaceItem{namespace: namespace, trial: trial}, delay) +// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. +func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, failed bool) { + if failed { + nc.namespaceBackoff.Next(namespace, time.Now()) + delay = delay + nc.namespaceBackoff.Get(namespace) + } else { + nc.namespaceBackoff.Reset(namespace) + } + nc.namespaceDeliverer.DeliverAfter(namespace, namespace, delay) } // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet @@ -203,30 +211,23 @@ func (nc *NamespaceController) isSynced() bool { // The function triggers reconcilation of all federated namespaces. func (nc *NamespaceController) reconcileNamespacesOnClusterChange() { if !nc.isSynced() { - nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) + nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay) } for _, obj := range nc.namespaceInformerStore.List() { namespace := obj.(*api_v1.Namespace) - nc.deliverNamespace(namespace.Name, nc.smallDelay, 0) + nc.deliverNamespace(namespace.Name, nc.smallDelay, false) } } -func backoff(trial int64) time.Duration { - if trial > 12 { - return 12 * 5 * time.Second - } - return time.Duration(trial) * 5 * time.Second -} - -func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) { +func (nc *NamespaceController) reconcileNamespace(namespace string) { if !nc.isSynced() { - nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) + nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false) } baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace) if err != nil { glog.Errorf("Failed to query main namespace store for %v: %v", namespace, err) - nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + nc.deliverNamespace(namespace, 0, true) return } @@ -240,7 +241,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{}) if err != nil { glog.Errorf("Failed to delete namespace %s: %v", baseNamespace.Name, err) - nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + nc.deliverNamespace(namespace, 0, true) } return } @@ -248,7 +249,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get cluster list: %v", err) - nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) + nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false) return } @@ -257,7 +258,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace) if err != nil { glog.Errorf("Failed to get %s from %s: %v", namespace, cluster.Name, err) - nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + nc.deliverNamespace(namespace, 0, true) return } desiredNamespace := &api_v1.Namespace{ @@ -292,10 +293,10 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) err = nc.federatedUpdater.Update(operations, nc.updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", namespace, err) - nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + nc.deliverNamespace(namespace, 0, true) return } // Evertyhing is in order but lets be double sure - nc.deliverNamespace(namespace, nc.namespaceReviewDelay, 0) + nc.deliverNamespace(namespace, nc.namespaceReviewDelay, false) } diff --git a/federation/pkg/federation-controller/namespace/namespace_controller_test.go b/federation/pkg/federation-controller/namespace/namespace_controller_test.go index fb0be1a5574..5119fd48c07 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller_test.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller_test.go @@ -66,11 +66,13 @@ func TestNamespaceController(t *testing.T) { return nil, fmt.Errorf("Unknown cluster") } }) - namespaceController.clusterAvailableDelay = 1000 * time.Millisecond + namespaceController.clusterAvailableDelay = time.Second namespaceController.namespaceReviewDelay = 50 * time.Millisecond namespaceController.smallDelay = 20 * time.Millisecond namespaceController.updateTimeout = 5 * time.Second - namespaceController.Start() + + stop := make(chan struct{}) + namespaceController.Run(stop) ns1 := api_v1.Namespace{ ObjectMeta: api_v1.ObjectMeta{ @@ -101,7 +103,7 @@ func TestNamespaceController(t *testing.T) { assert.Equal(t, ns1.Name, createdNamespace2.Name) // assert.Contains(t, createdNamespace2.Annotations, "A") - namespaceController.Stop() + close(stop) } func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly {