From 2ac63ebbe6cc9f05ed6647594d2d3f1eb2e47ef7 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Fri, 20 Mar 2015 12:49:03 -0400 Subject: [PATCH] Add namespace controller to drive life-cycle --- .../app/controllermanager.go | 7 + pkg/namespace/doc.go | 18 ++ pkg/namespace/namespace_controller.go | 298 ++++++++++++++++++ pkg/namespace/namespace_controller_test.go | 128 ++++++++ 4 files changed, 451 insertions(+) create mode 100644 pkg/namespace/doc.go create mode 100644 pkg/namespace/namespace_controller.go create mode 100644 pkg/namespace/namespace_controller_test.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 7a3429f6964..1f76ecb3542 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -33,6 +33,7 @@ import ( nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" + "github.com/GoogleCloudPlatform/kubernetes/pkg/namespace" "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -51,6 +52,7 @@ type CMServer struct { MinionRegexp string NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration + NamespaceSyncPeriod time.Duration RegisterRetryCount int MachineList util.StringList SyncNodeList bool @@ -72,6 +74,7 @@ func NewCMServer() *CMServer { Address: util.IP(net.ParseIP("127.0.0.1")), NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, + NamespaceSyncPeriod: 10 * time.Second, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, NodeMilliCPU: 1000, @@ -98,6 +101,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "The period for syncing nodes from cloudprovider. Longer periods will result in "+ "fewer calls to cloud provider, but may delay addition of new nodes to cluster.") fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") + fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates") fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.") fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+ "The number of retries for initial node registration. Retry interval equals node_sync_period.") @@ -176,6 +180,9 @@ func (s *CMServer) Run(_ []string) error { resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod) + namespaceManager := namespace.NewNamespaceManager(kubeClient) + namespaceManager.Run(s.NamespaceSyncPeriod) + select {} return nil } diff --git a/pkg/namespace/doc.go b/pkg/namespace/doc.go new file mode 100644 index 00000000000..93fe6331f4a --- /dev/null +++ b/pkg/namespace/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// namespace contains a controller that handles namespace lifecycle +package namespace diff --git a/pkg/namespace/namespace_controller.go b/pkg/namespace/namespace_controller.go new file mode 100644 index 00000000000..e3ae3f99c44 --- /dev/null +++ b/pkg/namespace/namespace_controller.go @@ -0,0 +1,298 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package namespace + +import ( + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" +) + +// NamespaceManager is responsible for performing actions dependent upon a namespace phase +type NamespaceManager struct { + kubeClient client.Interface + store cache.Store + syncTime <-chan time.Time + + // To allow injection for testing. + syncHandler func(namespace api.Namespace) error +} + +// NewNamespaceManager creates a new NamespaceManager +func NewNamespaceManager(kubeClient client.Interface) *NamespaceManager { + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + reflector := cache.NewReflector( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return kubeClient.Namespaces().List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return kubeClient.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion) + }, + }, + &api.Namespace{}, + store, + 0, + ) + reflector.Run() + nm := &NamespaceManager{ + kubeClient: kubeClient, + store: store, + } + // set the synchronization handler + nm.syncHandler = nm.syncNamespace + return nm +} + +// Run begins syncing at the specified period interval +func (nm *NamespaceManager) Run(period time.Duration) { + nm.syncTime = time.Tick(period) + go util.Forever(func() { nm.synchronize() }, period) +} + +// Iterate over the each namespace that is in terminating phase and perform necessary clean-up +func (nm *NamespaceManager) synchronize() { + namespaceObjs := nm.store.List() + wg := sync.WaitGroup{} + wg.Add(len(namespaceObjs)) + for ix := range namespaceObjs { + go func(ix int) { + defer wg.Done() + namespace := namespaceObjs[ix].(*api.Namespace) + glog.V(4).Infof("periodic sync of namespace: %v", namespace.Name) + err := nm.syncHandler(*namespace) + if err != nil { + glog.Errorf("Error synchronizing: %v", err) + } + }(ix) + } + wg.Wait() +} + +// finalized returns true if the spec.finalizers is empty list +func finalized(namespace api.Namespace) bool { + return len(namespace.Spec.Finalizers) == 0 +} + +// finalize will finalize the namespace for kubernetes +func finalize(kubeClient client.Interface, namespace api.Namespace) (*api.Namespace, error) { + namespaceFinalize := api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: namespace.Name, + ResourceVersion: namespace.ResourceVersion, + }, + Spec: api.NamespaceSpec{}, + } + finalizerSet := util.NewStringSet() + for i := range namespace.Spec.Finalizers { + if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes { + finalizerSet.Insert(string(namespace.Spec.Finalizers[i])) + } + } + namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, len(finalizerSet), len(finalizerSet)) + for _, value := range finalizerSet.List() { + namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value)) + } + return kubeClient.Namespaces().Finalize(&namespaceFinalize) +} + +// deleteAllContent will delete all content known to the system in a namespace +func deleteAllContent(kubeClient client.Interface, namespace string) (err error) { + err = deleteServices(kubeClient, namespace) + if err != nil { + return err + } + err = deleteReplicationControllers(kubeClient, namespace) + if err != nil { + return err + } + err = deletePods(kubeClient, namespace) + if err != nil { + return err + } + err = deleteSecrets(kubeClient, namespace) + if err != nil { + return err + } + err = deleteLimitRanges(kubeClient, namespace) + if err != nil { + return err + } + err = deleteResourceQuotas(kubeClient, namespace) + if err != nil { + return err + } + err = deleteEvents(kubeClient, namespace) + if err != nil { + return err + } + return nil +} + +// syncNamespace makes namespace life-cycle decisions +func (nm *NamespaceManager) syncNamespace(namespace api.Namespace) (err error) { + if namespace.DeletionTimestamp == nil { + return nil + } + + // if there is a deletion timestamp, and the status is not terminating, then update status + if namespace.DeletionTimestamp != nil && namespace.Status.Phase != api.NamespaceTerminating { + newNamespace := api.Namespace{} + newNamespace.ObjectMeta = namespace.ObjectMeta + newNamespace.Status = namespace.Status + newNamespace.Status.Phase = api.NamespaceTerminating + result, err := nm.kubeClient.Namespaces().Status(&newNamespace) + if err != nil { + return err + } + // work with the latest copy so we can proceed to clean up right away without another interval + namespace = *result + } + + // if the namespace is already finalized, delete it + if finalized(namespace) { + err = nm.kubeClient.Namespaces().Delete(namespace.Name) + return err + } + + // there may still be content for us to remove + err = deleteAllContent(nm.kubeClient, namespace.Name) + if err != nil { + return err + } + + // we have removed content, so mark it finalized by us + result, err := finalize(nm.kubeClient, namespace) + if err != nil { + return err + } + + // now check if all finalizers have reported that we delete now + if finalized(*result) { + err = nm.kubeClient.Namespaces().Delete(namespace.Name) + return err + } + + return nil +} + +func deleteLimitRanges(kubeClient client.Interface, ns string) error { + items, err := kubeClient.LimitRanges(ns).List(labels.Everything()) + if err != nil { + return err + } + for i := range items.Items { + err := kubeClient.LimitRanges(ns).Delete(items.Items[i].Name) + if err != nil { + return err + } + } + return nil +} + +func deleteResourceQuotas(kubeClient client.Interface, ns string) error { + resourceQuotas, err := kubeClient.ResourceQuotas(ns).List(labels.Everything()) + if err != nil { + return err + } + for i := range resourceQuotas.Items { + err := kubeClient.ResourceQuotas(ns).Delete(resourceQuotas.Items[i].Name) + if err != nil { + return err + } + } + return nil +} + +func deleteServices(kubeClient client.Interface, ns string) error { + items, err := kubeClient.Services(ns).List(labels.Everything()) + if err != nil { + return err + } + for i := range items.Items { + err := kubeClient.Services(ns).Delete(items.Items[i].Name) + if err != nil { + return err + } + } + return nil +} + +func deleteReplicationControllers(kubeClient client.Interface, ns string) error { + items, err := kubeClient.ReplicationControllers(ns).List(labels.Everything()) + if err != nil { + return err + } + for i := range items.Items { + err := kubeClient.ReplicationControllers(ns).Delete(items.Items[i].Name) + if err != nil { + return err + } + } + return nil +} + +func deletePods(kubeClient client.Interface, ns string) error { + items, err := kubeClient.Pods(ns).List(labels.Everything()) + if err != nil { + return err + } + for i := range items.Items { + err := kubeClient.Pods(ns).Delete(items.Items[i].Name) + if err != nil { + return err + } + } + return nil +} + +func deleteEvents(kubeClient client.Interface, ns string) error { + items, err := kubeClient.Events(ns).List(labels.Everything(), fields.Everything()) + if err != nil { + return err + } + for i := range items.Items { + err := kubeClient.Events(ns).Delete(items.Items[i].Name) + if err != nil { + return err + } + } + return nil +} + +func deleteSecrets(kubeClient client.Interface, ns string) error { + items, err := kubeClient.Secrets(ns).List(labels.Everything(), fields.Everything()) + if err != nil { + return err + } + for i := range items.Items { + err := kubeClient.Secrets(ns).Delete(items.Items[i].Name) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/namespace/namespace_controller_test.go b/pkg/namespace/namespace_controller_test.go new file mode 100644 index 00000000000..5d885f91b6c --- /dev/null +++ b/pkg/namespace/namespace_controller_test.go @@ -0,0 +1,128 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package namespace + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func TestFinalized(t *testing.T) { + testNamespace := api.Namespace{ + Spec: api.NamespaceSpec{ + Finalizers: []api.FinalizerName{"a", "b"}, + }, + } + if finalized(testNamespace) { + t.Errorf("Unexpected result, namespace is not finalized") + } + testNamespace.Spec.Finalizers = []api.FinalizerName{} + if !finalized(testNamespace) { + t.Errorf("Expected object to be finalized") + } +} + +func TestFinalize(t *testing.T) { + mockClient := &client.Fake{} + testNamespace := api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: "test", + ResourceVersion: "1", + }, + Spec: api.NamespaceSpec{ + Finalizers: []api.FinalizerName{"kubernetes", "other"}, + }, + } + finalize(mockClient, testNamespace) + if len(mockClient.Actions) != 1 { + t.Errorf("Expected 1 mock client action, but got %v", len(mockClient.Actions)) + } + if mockClient.Actions[0].Action != "finalize-namespace" { + t.Errorf("Expected finalize-namespace action %v", mockClient.Actions[0].Action) + } +} + +func TestSyncNamespaceThatIsTerminating(t *testing.T) { + mockClient := &client.Fake{} + nm := NewNamespaceManager(mockClient) + now := util.Now() + testNamespace := api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: "test", + ResourceVersion: "1", + DeletionTimestamp: &now, + }, + Spec: api.NamespaceSpec{ + Finalizers: []api.FinalizerName{"kubernetes"}, + }, + Status: api.NamespaceStatus{ + Phase: api.NamespaceTerminating, + }, + } + err := nm.syncNamespace(testNamespace) + if err != nil { + t.Errorf("Unexpected error when synching namespace %v", err) + } + expectedActionSet := util.NewStringSet( + "list-services", + "list-pods", + "list-resourceQuotas", + "list-controllers", + "list-secrets", + "list-limitRanges", + "list-events", + "finalize-namespace", + "delete-namespace") + actionSet := util.NewStringSet() + for i := range mockClient.Actions { + actionSet.Insert(mockClient.Actions[i].Action) + } + if !actionSet.HasAll(expectedActionSet.List()...) { + t.Errorf("Expected actions: %v, but got: %v", expectedActionSet, actionSet) + } +} + +func TestSyncNamespaceThatIsActive(t *testing.T) { + mockClient := &client.Fake{} + nm := NewNamespaceManager(mockClient) + testNamespace := api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: "test", + ResourceVersion: "1", + }, + Spec: api.NamespaceSpec{ + Finalizers: []api.FinalizerName{"kubernetes"}, + }, + Status: api.NamespaceStatus{ + Phase: api.NamespaceActive, + }, + } + err := nm.syncNamespace(testNamespace) + if err != nil { + t.Errorf("Unexpected error when synching namespace %v", err) + } + actionSet := util.NewStringSet() + for i := range mockClient.Actions { + actionSet.Insert(mockClient.Actions[i].Action) + } + if len(actionSet) != 0 { + t.Errorf("Expected no action from controller, but got: %v", actionSet) + } +}