diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 2b0f1a23890..187abd71596 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -148,7 +148,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller")) namespaceController := namespacecontroller.NewNamespaceController(nsClientset) - namespaceController.Start() + namespaceController.Run(wait.NeverStop) select {} } diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index 3fcfe2c4fe4..ff14d663be9 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -60,7 +61,8 @@ type NamespaceController struct { // Client to federated api server. federatedApiClient federation_release_1_4.Interface - stopChan chan struct{} + // Backoff manager for namespaces + namespaceBackoff *flowcontrol.Backoff namespaceReviewDelay time.Duration clusterAvailableDelay time.Duration @@ -68,23 +70,15 @@ type NamespaceController struct { updateTimeout time.Duration } -// A structure passed by delying deliver. It contains a namespace that should be reconciled and -// the number of trials that were made previously and ended up in some kind of namespace-related -// error (like failure to create). -type namespaceItem struct { - namespace string - trial int64 -} - // NewNamespaceController returns a new namespace controller func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController { nc := &NamespaceController{ federatedApiClient: client, - stopChan: make(chan struct{}), namespaceReviewDelay: time.Second * 10, clusterAvailableDelay: time.Second * 20, smallDelay: time.Second * 3, updateTimeout: time.Second * 30, + namespaceBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), } // Build delivereres for triggering reconcilations. @@ -103,7 +97,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC }, &api_v1.Namespace{}, controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, 0) })) + util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) })) // Federated informer on namespaces in members of federation. nc.namespaceFederatedInformer = util.NewFederatedInformer( @@ -123,7 +117,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC // Trigger reconcilation whenever something in federated cluster is changed. In most cases it // would be just confirmation that some namespace opration suceeded. util.NewTriggerOnMetaAndSpecChangesPreproc( - func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, 0) }, + func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) }, func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) }, )) }, @@ -131,7 +125,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *federation_api.Cluster) { // When new cluster becomes available process all the namespaces again. - nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) + nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay) }, }, ) @@ -156,30 +150,44 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC return nc } -func (nc *NamespaceController) Start() { - go nc.namespaceInformerController.Run(nc.stopChan) +func (nc *NamespaceController) Run(stopChan <-chan struct{}) { + go nc.namespaceInformerController.Run(stopChan) nc.namespaceFederatedInformer.Start() + go func() { + <-stopChan + nc.namespaceFederatedInformer.Stop() + }() nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - ni := item.Value.(*namespaceItem) - nc.reconcileNamespace(ni.namespace, ni.trial) + namespace := item.Value.(string) + nc.reconcileNamespace(namespace) }) nc.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { nc.reconcileNamespacesOnClusterChange() }) + go func() { + select { + case <-time.After(time.Minute): + nc.namespaceBackoff.GC() + case <-stopChan: + return + } + }() } -func (nc *NamespaceController) Stop() { - nc.namespaceFederatedInformer.Stop() - close(nc.stopChan) -} - -func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, trial int64) { +func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, failed bool) { namespace := obj.(*api_v1.Namespace) - nc.deliverNamespace(namespace.Name, delay, trial) + nc.deliverNamespace(namespace.Name, delay, failed) } -func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, trial int64) { - nc.namespaceDeliverer.DeliverAfter(namespace, &namespaceItem{namespace: namespace, trial: trial}, delay) +// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. +func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, failed bool) { + if failed { + nc.namespaceBackoff.Next(namespace, time.Now()) + delay = delay + nc.namespaceBackoff.Get(namespace) + } else { + nc.namespaceBackoff.Reset(namespace) + } + nc.namespaceDeliverer.DeliverAfter(namespace, namespace, delay) } // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet @@ -203,30 +211,23 @@ func (nc *NamespaceController) isSynced() bool { // The function triggers reconcilation of all federated namespaces. func (nc *NamespaceController) reconcileNamespacesOnClusterChange() { if !nc.isSynced() { - nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) + nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay) } for _, obj := range nc.namespaceInformerStore.List() { namespace := obj.(*api_v1.Namespace) - nc.deliverNamespace(namespace.Name, nc.smallDelay, 0) + nc.deliverNamespace(namespace.Name, nc.smallDelay, false) } } -func backoff(trial int64) time.Duration { - if trial > 12 { - return 12 * 5 * time.Second - } - return time.Duration(trial) * 5 * time.Second -} - -func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) { +func (nc *NamespaceController) reconcileNamespace(namespace string) { if !nc.isSynced() { - nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) + nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false) } baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace) if err != nil { glog.Errorf("Failed to query main namespace store for %v: %v", namespace, err) - nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + nc.deliverNamespace(namespace, 0, true) return } @@ -240,7 +241,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{}) if err != nil { glog.Errorf("Failed to delete namespace %s: %v", baseNamespace.Name, err) - nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + nc.deliverNamespace(namespace, 0, true) } return } @@ -248,7 +249,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get cluster list: %v", err) - nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) + nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false) return } @@ -257,7 +258,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace) if err != nil { glog.Errorf("Failed to get %s from %s: %v", namespace, cluster.Name, err) - nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + nc.deliverNamespace(namespace, 0, true) return } desiredNamespace := &api_v1.Namespace{ @@ -292,10 +293,10 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) err = nc.federatedUpdater.Update(operations, nc.updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", namespace, err) - nc.deliverNamespace(namespace, backoff(trial+1), trial+1) + nc.deliverNamespace(namespace, 0, true) return } // Evertyhing is in order but lets be double sure - nc.deliverNamespace(namespace, nc.namespaceReviewDelay, 0) + nc.deliverNamespace(namespace, nc.namespaceReviewDelay, false) } diff --git a/federation/pkg/federation-controller/namespace/namespace_controller_test.go b/federation/pkg/federation-controller/namespace/namespace_controller_test.go index fb0be1a5574..5119fd48c07 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller_test.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller_test.go @@ -66,11 +66,13 @@ func TestNamespaceController(t *testing.T) { return nil, fmt.Errorf("Unknown cluster") } }) - namespaceController.clusterAvailableDelay = 1000 * time.Millisecond + namespaceController.clusterAvailableDelay = time.Second namespaceController.namespaceReviewDelay = 50 * time.Millisecond namespaceController.smallDelay = 20 * time.Millisecond namespaceController.updateTimeout = 5 * time.Second - namespaceController.Start() + + stop := make(chan struct{}) + namespaceController.Run(stop) ns1 := api_v1.Namespace{ ObjectMeta: api_v1.ObjectMeta{ @@ -101,7 +103,7 @@ func TestNamespaceController(t *testing.T) { assert.Equal(t, ns1.Name, createdNamespace2.Name) // assert.Contains(t, createdNamespace2.Annotations, "A") - namespaceController.Stop() + close(stop) } func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly {