Add namespace controller to drive life-cycle

This commit is contained in:
derekwaynecarr 2015-03-20 12:49:03 -04:00
parent 29c491ef2e
commit 2ac63ebbe6
4 changed files with 451 additions and 0 deletions

View File

@ -33,6 +33,7 @@ import (
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/namespace"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -51,6 +52,7 @@ type CMServer struct {
MinionRegexp string
NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration
NamespaceSyncPeriod time.Duration
RegisterRetryCount int
MachineList util.StringList
SyncNodeList bool
@ -72,6 +74,7 @@ func NewCMServer() *CMServer {
Address: util.IP(net.ParseIP("127.0.0.1")),
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
NamespaceSyncPeriod: 10 * time.Second,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
NodeMilliCPU: 1000,
@ -98,6 +101,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
@ -176,6 +180,9 @@ func (s *CMServer) Run(_ []string) error {
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
namespaceManager := namespace.NewNamespaceManager(kubeClient)
namespaceManager.Run(s.NamespaceSyncPeriod)
select {}
return nil
}

18
pkg/namespace/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// namespace contains a controller that handles namespace lifecycle
package namespace

View File

@ -0,0 +1,298 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// NamespaceManager is responsible for performing actions dependent upon a namespace phase
type NamespaceManager struct {
kubeClient client.Interface
store cache.Store
syncTime <-chan time.Time
// To allow injection for testing.
syncHandler func(namespace api.Namespace) error
}
// NewNamespaceManager creates a new NamespaceManager
func NewNamespaceManager(kubeClient client.Interface) *NamespaceManager {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.Namespaces().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.Namespace{},
store,
0,
)
reflector.Run()
nm := &NamespaceManager{
kubeClient: kubeClient,
store: store,
}
// set the synchronization handler
nm.syncHandler = nm.syncNamespace
return nm
}
// Run begins syncing at the specified period interval
func (nm *NamespaceManager) Run(period time.Duration) {
nm.syncTime = time.Tick(period)
go util.Forever(func() { nm.synchronize() }, period)
}
// Iterate over the each namespace that is in terminating phase and perform necessary clean-up
func (nm *NamespaceManager) synchronize() {
namespaceObjs := nm.store.List()
wg := sync.WaitGroup{}
wg.Add(len(namespaceObjs))
for ix := range namespaceObjs {
go func(ix int) {
defer wg.Done()
namespace := namespaceObjs[ix].(*api.Namespace)
glog.V(4).Infof("periodic sync of namespace: %v", namespace.Name)
err := nm.syncHandler(*namespace)
if err != nil {
glog.Errorf("Error synchronizing: %v", err)
}
}(ix)
}
wg.Wait()
}
// finalized returns true if the spec.finalizers is empty list
func finalized(namespace api.Namespace) bool {
return len(namespace.Spec.Finalizers) == 0
}
// finalize will finalize the namespace for kubernetes
func finalize(kubeClient client.Interface, namespace api.Namespace) (*api.Namespace, error) {
namespaceFinalize := api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: namespace.Name,
ResourceVersion: namespace.ResourceVersion,
},
Spec: api.NamespaceSpec{},
}
finalizerSet := util.NewStringSet()
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes {
finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
}
}
namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, len(finalizerSet), len(finalizerSet))
for _, value := range finalizerSet.List() {
namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value))
}
return kubeClient.Namespaces().Finalize(&namespaceFinalize)
}
// deleteAllContent will delete all content known to the system in a namespace
func deleteAllContent(kubeClient client.Interface, namespace string) (err error) {
err = deleteServices(kubeClient, namespace)
if err != nil {
return err
}
err = deleteReplicationControllers(kubeClient, namespace)
if err != nil {
return err
}
err = deletePods(kubeClient, namespace)
if err != nil {
return err
}
err = deleteSecrets(kubeClient, namespace)
if err != nil {
return err
}
err = deleteLimitRanges(kubeClient, namespace)
if err != nil {
return err
}
err = deleteResourceQuotas(kubeClient, namespace)
if err != nil {
return err
}
err = deleteEvents(kubeClient, namespace)
if err != nil {
return err
}
return nil
}
// syncNamespace makes namespace life-cycle decisions
func (nm *NamespaceManager) syncNamespace(namespace api.Namespace) (err error) {
if namespace.DeletionTimestamp == nil {
return nil
}
// if there is a deletion timestamp, and the status is not terminating, then update status
if namespace.DeletionTimestamp != nil && namespace.Status.Phase != api.NamespaceTerminating {
newNamespace := api.Namespace{}
newNamespace.ObjectMeta = namespace.ObjectMeta
newNamespace.Status = namespace.Status
newNamespace.Status.Phase = api.NamespaceTerminating
result, err := nm.kubeClient.Namespaces().Status(&newNamespace)
if err != nil {
return err
}
// work with the latest copy so we can proceed to clean up right away without another interval
namespace = *result
}
// if the namespace is already finalized, delete it
if finalized(namespace) {
err = nm.kubeClient.Namespaces().Delete(namespace.Name)
return err
}
// there may still be content for us to remove
err = deleteAllContent(nm.kubeClient, namespace.Name)
if err != nil {
return err
}
// we have removed content, so mark it finalized by us
result, err := finalize(nm.kubeClient, namespace)
if err != nil {
return err
}
// now check if all finalizers have reported that we delete now
if finalized(*result) {
err = nm.kubeClient.Namespaces().Delete(namespace.Name)
return err
}
return nil
}
func deleteLimitRanges(kubeClient client.Interface, ns string) error {
items, err := kubeClient.LimitRanges(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.LimitRanges(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteResourceQuotas(kubeClient client.Interface, ns string) error {
resourceQuotas, err := kubeClient.ResourceQuotas(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range resourceQuotas.Items {
err := kubeClient.ResourceQuotas(ns).Delete(resourceQuotas.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteServices(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Services(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Services(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteReplicationControllers(kubeClient client.Interface, ns string) error {
items, err := kubeClient.ReplicationControllers(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.ReplicationControllers(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deletePods(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Pods(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Pods(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteEvents(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Events(ns).List(labels.Everything(), fields.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Events(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteSecrets(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Secrets(ns).List(labels.Everything(), fields.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Secrets(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,128 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestFinalized(t *testing.T) {
testNamespace := api.Namespace{
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"a", "b"},
},
}
if finalized(testNamespace) {
t.Errorf("Unexpected result, namespace is not finalized")
}
testNamespace.Spec.Finalizers = []api.FinalizerName{}
if !finalized(testNamespace) {
t.Errorf("Expected object to be finalized")
}
}
func TestFinalize(t *testing.T) {
mockClient := &client.Fake{}
testNamespace := api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "test",
ResourceVersion: "1",
},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"kubernetes", "other"},
},
}
finalize(mockClient, testNamespace)
if len(mockClient.Actions) != 1 {
t.Errorf("Expected 1 mock client action, but got %v", len(mockClient.Actions))
}
if mockClient.Actions[0].Action != "finalize-namespace" {
t.Errorf("Expected finalize-namespace action %v", mockClient.Actions[0].Action)
}
}
func TestSyncNamespaceThatIsTerminating(t *testing.T) {
mockClient := &client.Fake{}
nm := NewNamespaceManager(mockClient)
now := util.Now()
testNamespace := api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "test",
ResourceVersion: "1",
DeletionTimestamp: &now,
},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"kubernetes"},
},
Status: api.NamespaceStatus{
Phase: api.NamespaceTerminating,
},
}
err := nm.syncNamespace(testNamespace)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)
}
expectedActionSet := util.NewStringSet(
"list-services",
"list-pods",
"list-resourceQuotas",
"list-controllers",
"list-secrets",
"list-limitRanges",
"list-events",
"finalize-namespace",
"delete-namespace")
actionSet := util.NewStringSet()
for i := range mockClient.Actions {
actionSet.Insert(mockClient.Actions[i].Action)
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions: %v, but got: %v", expectedActionSet, actionSet)
}
}
func TestSyncNamespaceThatIsActive(t *testing.T) {
mockClient := &client.Fake{}
nm := NewNamespaceManager(mockClient)
testNamespace := api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "test",
ResourceVersion: "1",
},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"kubernetes"},
},
Status: api.NamespaceStatus{
Phase: api.NamespaceActive,
},
}
err := nm.syncNamespace(testNamespace)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)
}
actionSet := util.NewStringSet()
for i := range mockClient.Actions {
actionSet.Insert(mockClient.Actions[i].Action)
}
if len(actionSet) != 0 {
t.Errorf("Expected no action from controller, but got: %v", actionSet)
}
}