From c1a3fa0dae540ebffef001d26887fb90131fa01e Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Mon, 13 Apr 2015 13:15:27 -0400 Subject: [PATCH] Remove delay when deleting namespaces, move to new controller framework --- .../app/controllermanager.go | 6 +- pkg/controller/framework/controller.go | 2 +- pkg/namespace/namespace_controller.go | 78 +++++++------------ pkg/namespace/namespace_controller_test.go | 7 +- 4 files changed, 35 insertions(+), 58 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index aa063b6a9f1..bbe80427779 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -81,7 +81,7 @@ func NewCMServer() *CMServer { Address: util.IP(net.ParseIP("127.0.0.1")), NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, - NamespaceSyncPeriod: 1 * time.Minute, + NamespaceSyncPeriod: 5 * time.Minute, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, NodeMilliCPU: 1000, @@ -207,8 +207,8 @@ func (s *CMServer) Run(_ []string) error { resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod) - namespaceManager := namespace.NewNamespaceManager(kubeClient) - namespaceManager.Run(s.NamespaceSyncPeriod) + namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod) + namespaceManager.Run() select {} return nil diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index f34913e5a02..2ed03f9bfc8 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -172,7 +172,7 @@ func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { // Parameters: // * lw is list and watch functions for the source of the resource you want to // be informed of. -// * objType is an object of the type that you expect to receieve. +// * objType is an object of the type that you expect to receive. // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate // calls, even if nothing changed). Otherwise, re-list will be delayed as // long as possible (until the upstream source closes the watch or times out, diff --git a/pkg/namespace/namespace_controller.go b/pkg/namespace/namespace_controller.go index 667251ce8cd..cb3bff21b39 100644 --- a/pkg/namespace/namespace_controller.go +++ b/pkg/namespace/namespace_controller.go @@ -17,34 +17,28 @@ limitations under the License. package namespace import ( - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/golang/glog" ) // NamespaceManager is responsible for performing actions dependent upon a namespace phase type NamespaceManager struct { - kubeClient client.Interface - store cache.Store - syncTime <-chan time.Time - - // To allow injection for testing. - syncHandler func(namespace api.Namespace) error + controller *framework.Controller + StopEverything chan struct{} } // NewNamespaceManager creates a new NamespaceManager -func NewNamespaceManager(kubeClient client.Interface) *NamespaceManager { - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - reflector := cache.NewReflector( +func NewNamespaceManager(kubeClient client.Interface, resyncPeriod time.Duration) *NamespaceManager { + _, controller := framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { return kubeClient.Namespaces().List(labels.Everything(), fields.Everything()) @@ -54,42 +48,28 @@ func NewNamespaceManager(kubeClient client.Interface) *NamespaceManager { }, }, &api.Namespace{}, - store, - 0, + resyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + namespace := obj.(*api.Namespace) + syncNamespace(kubeClient, *namespace) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + namespace := newObj.(*api.Namespace) + syncNamespace(kubeClient, *namespace) + }, + }, ) - reflector.Run() - nm := &NamespaceManager{ - kubeClient: kubeClient, - store: store, + + return &NamespaceManager{ + controller: controller, + StopEverything: make(chan struct{}), } - // set the synchronization handler - nm.syncHandler = nm.syncNamespace - return nm } -// Run begins syncing at the specified period interval -func (nm *NamespaceManager) Run(period time.Duration) { - nm.syncTime = time.Tick(period) - go util.Forever(func() { nm.synchronize() }, period) -} - -// Iterate over the each namespace that is in terminating phase and perform necessary clean-up -func (nm *NamespaceManager) synchronize() { - namespaceObjs := nm.store.List() - wg := sync.WaitGroup{} - wg.Add(len(namespaceObjs)) - for ix := range namespaceObjs { - go func(ix int) { - defer wg.Done() - namespace := namespaceObjs[ix].(*api.Namespace) - glog.V(4).Infof("periodic sync of namespace: %v", namespace.Name) - err := nm.syncHandler(*namespace) - if err != nil { - glog.Errorf("Error synchronizing: %v", err) - } - }(ix) - } - wg.Wait() +// Run begins observing the system. It starts a goroutine and returns immediately. +func (nm *NamespaceManager) Run() { + go nm.controller.Run(nm.StopEverything) } // finalized returns true if the spec.finalizers is empty list @@ -153,7 +133,7 @@ func deleteAllContent(kubeClient client.Interface, namespace string) (err error) } // syncNamespace makes namespace life-cycle decisions -func (nm *NamespaceManager) syncNamespace(namespace api.Namespace) (err error) { +func syncNamespace(kubeClient client.Interface, namespace api.Namespace) (err error) { if namespace.DeletionTimestamp == nil { return nil } @@ -164,7 +144,7 @@ func (nm *NamespaceManager) syncNamespace(namespace api.Namespace) (err error) { newNamespace.ObjectMeta = namespace.ObjectMeta newNamespace.Status = namespace.Status newNamespace.Status.Phase = api.NamespaceTerminating - result, err := nm.kubeClient.Namespaces().Status(&newNamespace) + result, err := kubeClient.Namespaces().Status(&newNamespace) if err != nil { return err } @@ -174,25 +154,25 @@ func (nm *NamespaceManager) syncNamespace(namespace api.Namespace) (err error) { // if the namespace is already finalized, delete it if finalized(namespace) { - err = nm.kubeClient.Namespaces().Delete(namespace.Name) + err = kubeClient.Namespaces().Delete(namespace.Name) return err } // there may still be content for us to remove - err = deleteAllContent(nm.kubeClient, namespace.Name) + err = deleteAllContent(kubeClient, namespace.Name) if err != nil { return err } // we have removed content, so mark it finalized by us - result, err := finalize(nm.kubeClient, namespace) + result, err := finalize(kubeClient, namespace) if err != nil { return err } // now check if all finalizers have reported that we delete now if finalized(*result) { - err = nm.kubeClient.Namespaces().Delete(namespace.Name) + err = kubeClient.Namespaces().Delete(namespace.Name) return err } diff --git a/pkg/namespace/namespace_controller_test.go b/pkg/namespace/namespace_controller_test.go index 7550dbec1d5..cb133309701 100644 --- a/pkg/namespace/namespace_controller_test.go +++ b/pkg/namespace/namespace_controller_test.go @@ -20,7 +20,6 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -69,7 +68,6 @@ func TestFinalize(t *testing.T) { func TestSyncNamespaceThatIsTerminating(t *testing.T) { mockClient := &testclient.Fake{} - nm := NamespaceManager{kubeClient: mockClient, store: cache.NewStore(cache.MetaNamespaceKeyFunc)} now := util.Now() testNamespace := api.Namespace{ ObjectMeta: api.ObjectMeta{ @@ -84,7 +82,7 @@ func TestSyncNamespaceThatIsTerminating(t *testing.T) { Phase: api.NamespaceTerminating, }, } - err := nm.syncNamespace(testNamespace) + err := syncNamespace(mockClient, testNamespace) if err != nil { t.Errorf("Unexpected error when synching namespace %v", err) } @@ -109,7 +107,6 @@ func TestSyncNamespaceThatIsTerminating(t *testing.T) { func TestSyncNamespaceThatIsActive(t *testing.T) { mockClient := &testclient.Fake{} - nm := NamespaceManager{kubeClient: mockClient, store: cache.NewStore(cache.MetaNamespaceKeyFunc)} testNamespace := api.Namespace{ ObjectMeta: api.ObjectMeta{ Name: "test", @@ -122,7 +119,7 @@ func TestSyncNamespaceThatIsActive(t *testing.T) { Phase: api.NamespaceActive, }, } - err := nm.syncNamespace(testNamespace) + err := syncNamespace(mockClient, testNamespace) if err != nil { t.Errorf("Unexpected error when synching namespace %v", err) }