Merge pull request #5554 from derekwaynecarr/namespace_controller

Namespace Controller to support Namespace Termination
This commit is contained in:
Clayton Coleman 2015-03-24 12:59:00 -04:00
commit 97560c409f
29 changed files with 837 additions and 28 deletions

View File

@ -71,4 +71,4 @@ DNS_DOMAIN="kubernetes.local"
DNS_REPLICAS=1
# Admission Controllers to invoke prior to persisting objects in cluster
ADMISSION_CONTROL=NamespaceAutoProvision,LimitRanger,ResourceQuota
ADMISSION_CONTROL=NamespaceLifecycle,NamespaceAutoProvision,LimitRanger,ResourceQuota

View File

@ -49,4 +49,4 @@ ELASTICSEARCH_LOGGING_REPLICAS=1
ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-true}"
# Admission Controllers to invoke prior to persisting objects in cluster
ADMISSION_CONTROL=NamespaceAutoProvision,LimitRanger,ResourceQuota
ADMISSION_CONTROL=NamespaceLifecycle,NamespaceAutoProvision,LimitRanger,ResourceQuota

View File

@ -108,4 +108,4 @@ DNS_DOMAIN="kubernetes.local"
DNS_REPLICAS=1
# Admission Controllers to invoke prior to persisting objects in cluster
ADMISSION_CONTROL=NamespaceAutoProvision,LimitRanger,ResourceQuota
ADMISSION_CONTROL=NamespaceLifecycle,NamespaceAutoProvision,LimitRanger,ResourceQuota

View File

@ -49,7 +49,7 @@ MASTER_USER=vagrant
MASTER_PASSWD=vagrant
# Admission Controllers to invoke prior to persisting objects in cluster
ADMISSION_CONTROL=NamespaceAutoProvision,LimitRanger,ResourceQuota
ADMISSION_CONTROL=NamespaceLifecycle,NamespaceAutoProvision,LimitRanger,ResourceQuota
# Optional: Install node monitoring.
ENABLE_NODE_MONITORING=true

View File

@ -33,6 +33,7 @@ import (
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/namespace"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -51,6 +52,7 @@ type CMServer struct {
MinionRegexp string
NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration
NamespaceSyncPeriod time.Duration
RegisterRetryCount int
MachineList util.StringList
SyncNodeList bool
@ -72,6 +74,7 @@ func NewCMServer() *CMServer {
Address: util.IP(net.ParseIP("127.0.0.1")),
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
NamespaceSyncPeriod: 1 * time.Minute,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
NodeMilliCPU: 1000,
@ -98,6 +101,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
@ -176,6 +180,9 @@ func (s *CMServer) Run(_ []string) error {
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
namespaceManager := namespace.NewNamespaceManager(kubeClient)
namespaceManager.Run(s.NamespaceSyncPeriod)
select {}
return nil
}

View File

@ -95,3 +95,10 @@ func IsServiceIPSet(service *Service) bool {
func IsServiceIPRequested(service *Service) bool {
return service.Spec.PortalIP == ""
}
var standardFinalizers = util.NewStringSet(
string(FinalizerKubernetes))
func IsStandardFinalizerName(str string) bool {
return standardFinalizers.Has(str)
}

View File

@ -198,6 +198,9 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
c.FuzzNoCustom(s) // fuzz self without calling this function again
s.Type = api.SecretTypeOpaque
},
func(s *api.NamespaceSpec, c fuzz.Continue) {
s.Finalizers = []api.FinalizerName{api.FinalizerKubernetes}
},
func(s *api.NamespaceStatus, c fuzz.Continue) {
s.Phase = api.NamespaceActive
},

View File

@ -968,8 +968,17 @@ type NodeList struct {
// NamespaceSpec describes the attributes on a Namespace
type NamespaceSpec struct {
// Finalizers is an opaque list of values that must be empty to permanently remove object from storage
Finalizers []FinalizerName
}
type FinalizerName string
// These are internal finalizer values to Kubernetes, must be qualified name unless defined here
const (
FinalizerKubernetes FinalizerName = "kubernetes"
)
// NamespaceStatus is information about the current status of a Namespace.
type NamespaceStatus struct {
// Phase is the current lifecycle phase of the namespace.

View File

@ -1426,4 +1426,17 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "namespaces",
func(label, value string) (string, string, error) {
switch label {
case "status.phase":
return label, value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
}

View File

@ -784,8 +784,17 @@ type MinionList struct {
Items []Minion `json:"items" description:"list of nodes"`
}
type FinalizerName string
// These are internal finalizer values to Kubernetes, must be qualified name unless defined here
const (
FinalizerKubernetes FinalizerName = "kubernetes"
)
// NamespaceSpec describes the attributes on a Namespace
type NamespaceSpec struct {
// Finalizers is an opaque list of values that must be empty to permanently remove object from storage
Finalizers []FinalizerName `json:"finalizers,omitempty" description:"an opaque list of values that must be empty to permanently remove object from storage"`
}
// NamespaceStatus is information about the current status of a Namespace.

View File

@ -1354,4 +1354,17 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "namespaces",
func(label, value string) (string, string, error) {
switch label {
case "status.phase":
return label, value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
}

View File

@ -800,8 +800,17 @@ type MinionList struct {
Items []Minion `json:"items" description:"list of nodes"`
}
type FinalizerName string
// These are internal finalizer values to Kubernetes, must be qualified name unless defined here
const (
FinalizerKubernetes FinalizerName = "kubernetes"
)
// NamespaceSpec describes the attributes on a Namespace
type NamespaceSpec struct {
// Finalizers is an opaque list of values that must be empty to permanently remove object from storage
Finalizers []FinalizerName `json:"finalizers,omitempty" description:"an opaque list of values that must be empty to permanently remove object from storage"`
}
// NamespaceStatus is information about the current status of a Namespace.

View File

@ -60,4 +60,17 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "namespaces",
func(label, value string) (string, string, error) {
switch label {
case "status.phase":
return label, value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
}

View File

@ -952,8 +952,17 @@ type NodeList struct {
Items []Node `json:"items" description:"list of nodes"`
}
type FinalizerName string
// These are internal finalizer values to Kubernetes, must be qualified name unless defined here
const (
FinalizerKubernetes FinalizerName = "kubernetes"
)
// NamespaceSpec describes the attributes on a Namespace
type NamespaceSpec struct {
// Finalizers is an opaque list of values that must be empty to permanently remove object from storage
Finalizers []FinalizerName `json:"finalizers,omitempty" description:"an opaque list of values that must be empty to permanently remove object from storage"`
}
// NamespaceStatus is information about the current status of a Namespace.

View File

@ -1008,9 +1008,28 @@ func ValidateResourceQuotaStatusUpdate(newResourceQuota, oldResourceQuota *api.R
func ValidateNamespace(namespace *api.Namespace) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, ValidateObjectMeta(&namespace.ObjectMeta, false, ValidateNamespaceName).Prefix("metadata")...)
for i := range namespace.Spec.Finalizers {
allErrs = append(allErrs, validateFinalizerName(string(namespace.Spec.Finalizers[i]))...)
}
return allErrs
}
// Validate finalizer names
func validateFinalizerName(stringValue string) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
if !util.IsQualifiedName(stringValue) {
return append(allErrs, fmt.Errorf("finalizer name: %v, %v", stringValue, qualifiedNameErrorMsg))
}
if len(strings.Split(stringValue, "/")) == 1 {
if !api.IsStandardFinalizerName(stringValue) {
return append(allErrs, fmt.Errorf("finalizer name: %v is neither a standard finalizer name nor is it fully qualified", stringValue))
}
}
return errs.ValidationErrorList{}
}
// ValidateNamespaceUpdate tests to make sure a mamespace update can be applied. Modifies oldNamespace.
func ValidateNamespaceUpdate(oldNamespace *api.Namespace, namespace *api.Namespace) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
@ -1019,6 +1038,7 @@ func ValidateNamespaceUpdate(oldNamespace *api.Namespace, namespace *api.Namespa
// TODO: move reset function to its own location
// Ignore metadata changes now that they have been tested
oldNamespace.ObjectMeta = namespace.ObjectMeta
oldNamespace.Spec.Finalizers = namespace.Spec.Finalizers
// TODO: Add a 'real' ValidationError type for this error and provide print actual diffs.
if !api.Semantic.DeepEqual(oldNamespace, namespace) {
@ -1033,9 +1053,20 @@ func ValidateNamespaceUpdate(oldNamespace *api.Namespace, namespace *api.Namespa
func ValidateNamespaceStatusUpdate(newNamespace, oldNamespace *api.Namespace) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldNamespace.ObjectMeta, &newNamespace.ObjectMeta).Prefix("metadata")...)
if newNamespace.Status.Phase != oldNamespace.Status.Phase {
allErrs = append(allErrs, errs.NewFieldInvalid("status.phase", newNamespace.Status.Phase, "namespace phase cannot be changed directly"))
}
newNamespace.Spec = oldNamespace.Spec
return allErrs
}
// ValidateNamespaceFinalizeUpdate tests to see if the update is legal for an end user to make. newNamespace is updated with fields
// that cannot be changed.
func ValidateNamespaceFinalizeUpdate(newNamespace, oldNamespace *api.Namespace) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldNamespace.ObjectMeta, &newNamespace.ObjectMeta).Prefix("metadata")...)
for i := range newNamespace.Spec.Finalizers {
allErrs = append(allErrs, validateFinalizerName(string(newNamespace.Spec.Finalizers[i]))...)
}
newNamespace.ObjectMeta = oldNamespace.ObjectMeta
newNamespace.Status = oldNamespace.Status
fmt.Printf("NEW NAMESPACE FINALIZERS : %v\n", newNamespace.Spec.Finalizers)
return allErrs
}

View File

@ -2206,6 +2206,90 @@ func TestValidateNamespace(t *testing.T) {
}
}
func TestValidateNamespaceFinalizeUpdate(t *testing.T) {
tests := []struct {
oldNamespace api.Namespace
namespace api.Namespace
valid bool
}{
{api.Namespace{}, api.Namespace{}, true},
{api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "foo"}},
api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "foo"},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"Foo"},
},
}, false},
{api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "foo"},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"foo.com/bar"},
},
},
api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "foo"},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"foo.com/bar", "what.com/bar"},
},
}, true},
}
for i, test := range tests {
errs := ValidateNamespaceFinalizeUpdate(&test.namespace, &test.oldNamespace)
if test.valid && len(errs) > 0 {
t.Errorf("%d: Unexpected error: %v", i, errs)
t.Logf("%#v vs %#v", test.oldNamespace, test.namespace)
}
if !test.valid && len(errs) == 0 {
t.Errorf("%d: Unexpected non-error", i)
}
}
}
func TestValidateNamespaceStatusUpdate(t *testing.T) {
tests := []struct {
oldNamespace api.Namespace
namespace api.Namespace
valid bool
}{
{api.Namespace{}, api.Namespace{}, true},
{api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "foo"}},
api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "foo"},
Status: api.NamespaceStatus{
Phase: api.NamespaceTerminating,
},
}, true},
{api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "foo"}},
api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "bar"},
Status: api.NamespaceStatus{
Phase: api.NamespaceTerminating,
},
}, false},
}
for i, test := range tests {
errs := ValidateNamespaceStatusUpdate(&test.oldNamespace, &test.namespace)
if test.valid && len(errs) > 0 {
t.Errorf("%d: Unexpected error: %v", i, errs)
t.Logf("%#v vs %#v", test.oldNamespace.ObjectMeta, test.namespace.ObjectMeta)
}
if !test.valid && len(errs) == 0 {
t.Errorf("%d: Unexpected non-error", i)
}
}
}
func TestValidateNamespaceUpdate(t *testing.T) {
tests := []struct {
oldNamespace api.Namespace

View File

@ -29,7 +29,7 @@ type FakeNamespaces struct {
Fake *Fake
}
func (c *FakeNamespaces) List(selector labels.Selector) (*api.NamespaceList, error) {
func (c *FakeNamespaces) List(labels labels.Selector, field fields.Selector) (*api.NamespaceList, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "list-namespaces"})
return api.Scheme.CopyOrDie(&c.Fake.NamespacesList).(*api.NamespaceList), nil
}
@ -58,3 +58,13 @@ func (c *FakeNamespaces) Watch(label labels.Selector, field fields.Selector, res
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-namespaces", Value: resourceVersion})
return c.Fake.Watch, nil
}
func (c *FakeNamespaces) Finalize(namespace *api.Namespace) (*api.Namespace, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "finalize-namespace", Value: namespace.Name})
return &api.Namespace{}, nil
}
func (c *FakeNamespaces) Status(namespace *api.Namespace) (*api.Namespace, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "status-namespace", Value: namespace.Name})
return &api.Namespace{}, nil
}

View File

@ -33,10 +33,12 @@ type NamespacesInterface interface {
type NamespaceInterface interface {
Create(item *api.Namespace) (*api.Namespace, error)
Get(name string) (result *api.Namespace, err error)
List(selector labels.Selector) (*api.NamespaceList, error)
List(label labels.Selector, field fields.Selector) (*api.NamespaceList, error)
Delete(name string) error
Update(item *api.Namespace) (*api.Namespace, error)
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
Finalize(item *api.Namespace) (*api.Namespace, error)
Status(item *api.Namespace) (*api.Namespace, error)
}
// namespaces implements NamespacesInterface
@ -57,9 +59,13 @@ func (c *namespaces) Create(namespace *api.Namespace) (*api.Namespace, error) {
}
// List lists all the namespaces in the cluster.
func (c *namespaces) List(selector labels.Selector) (*api.NamespaceList, error) {
func (c *namespaces) List(label labels.Selector, field fields.Selector) (*api.NamespaceList, error) {
result := &api.NamespaceList{}
err := c.r.Get().Resource("namespaces").LabelsSelectorParam(api.LabelSelectorQueryParam(c.r.APIVersion()), selector).Do().Into(result)
err := c.r.Get().
Resource("namespaces").
LabelsSelectorParam(api.LabelSelectorQueryParam(c.r.APIVersion()), label).
FieldsSelectorParam(api.FieldSelectorQueryParam(c.r.APIVersion()), field).
Do().Into(result)
return result, err
}
@ -74,6 +80,28 @@ func (c *namespaces) Update(namespace *api.Namespace) (result *api.Namespace, er
return
}
// Finalize takes the representation of a namespace to update. Returns the server's representation of the namespace, and an error, if it occurs.
func (c *namespaces) Finalize(namespace *api.Namespace) (result *api.Namespace, err error) {
result = &api.Namespace{}
if len(namespace.ResourceVersion) == 0 {
err = fmt.Errorf("invalid update object, missing resource version: %v", namespace)
return
}
err = c.r.Put().Resource("namespaces").Name(namespace.Name).SubResource("finalize").Body(namespace).Do().Into(result)
return
}
// Status takes the representation of a namespace to update. Returns the server's representation of the namespace, and an error, if it occurs.
func (c *namespaces) Status(namespace *api.Namespace) (result *api.Namespace, err error) {
result = &api.Namespace{}
if len(namespace.ResourceVersion) == 0 {
err = fmt.Errorf("invalid update object, missing resource version: %v", namespace)
return
}
err = c.r.Put().Resource("namespaces").Name(namespace.Name).SubResource("status").Body(namespace).Do().Into(result)
return
}
// Get gets an existing namespace
func (c *namespaces) Get(name string) (*api.Namespace, error) {
if len(name) == 0 {

View File

@ -91,7 +91,7 @@ func TestNamespaceList(t *testing.T) {
},
Response: Response{StatusCode: 200, Body: namespaceList},
}
response, err := c.Setup().Namespaces().List(labels.Everything())
response, err := c.Setup().Namespaces().List(labels.Everything(), fields.Everything())
if err != nil {
t.Errorf("%#v should be nil.", err)
@ -117,6 +117,9 @@ func TestNamespaceUpdate(t *testing.T) {
"name": "baz",
},
},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{api.FinalizerKubernetes},
},
}
c := &testClient{
Request: testRequest{Method: "PUT", Path: "/namespaces/foo"},
@ -126,6 +129,28 @@ func TestNamespaceUpdate(t *testing.T) {
c.Validate(t, receivedNamespace, err)
}
func TestNamespaceFinalize(t *testing.T) {
requestNamespace := &api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
"name": "baz",
},
},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{api.FinalizerKubernetes},
},
}
c := &testClient{
Request: testRequest{Method: "PUT", Path: "/namespaces/foo/finalize"},
Response: Response{StatusCode: 200, Body: requestNamespace},
}
receivedNamespace, err := c.Setup().Namespaces().Finalize(requestNamespace)
c.Validate(t, receivedNamespace, err)
}
func TestNamespaceDelete(t *testing.T) {
c := &testClient{
Request: testRequest{Method: "DELETE", Path: "/namespaces/foo"},

View File

@ -358,7 +358,7 @@ func (m *Master) init(c *Config) {
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewStorage(c.EtcdHelper)
secretRegistry := secret.NewEtcdRegistry(c.EtcdHelper)
namespaceStorage := namespaceetcd.NewStorage(c.EtcdHelper)
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewStorage(c.EtcdHelper)
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
// TODO: split me up into distinct storage registries
@ -404,6 +404,8 @@ func (m *Master) init(c *Config) {
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secret.NewStorage(secretRegistry),
}

18
pkg/namespace/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// namespace contains a controller that handles namespace lifecycle
package namespace

View File

@ -0,0 +1,298 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// NamespaceManager is responsible for performing actions dependent upon a namespace phase
type NamespaceManager struct {
kubeClient client.Interface
store cache.Store
syncTime <-chan time.Time
// To allow injection for testing.
syncHandler func(namespace api.Namespace) error
}
// NewNamespaceManager creates a new NamespaceManager
func NewNamespaceManager(kubeClient client.Interface) *NamespaceManager {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.Namespaces().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.Namespace{},
store,
0,
)
reflector.Run()
nm := &NamespaceManager{
kubeClient: kubeClient,
store: store,
}
// set the synchronization handler
nm.syncHandler = nm.syncNamespace
return nm
}
// Run begins syncing at the specified period interval
func (nm *NamespaceManager) Run(period time.Duration) {
nm.syncTime = time.Tick(period)
go util.Forever(func() { nm.synchronize() }, period)
}
// Iterate over the each namespace that is in terminating phase and perform necessary clean-up
func (nm *NamespaceManager) synchronize() {
namespaceObjs := nm.store.List()
wg := sync.WaitGroup{}
wg.Add(len(namespaceObjs))
for ix := range namespaceObjs {
go func(ix int) {
defer wg.Done()
namespace := namespaceObjs[ix].(*api.Namespace)
glog.V(4).Infof("periodic sync of namespace: %v", namespace.Name)
err := nm.syncHandler(*namespace)
if err != nil {
glog.Errorf("Error synchronizing: %v", err)
}
}(ix)
}
wg.Wait()
}
// finalized returns true if the spec.finalizers is empty list
func finalized(namespace api.Namespace) bool {
return len(namespace.Spec.Finalizers) == 0
}
// finalize will finalize the namespace for kubernetes
func finalize(kubeClient client.Interface, namespace api.Namespace) (*api.Namespace, error) {
namespaceFinalize := api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: namespace.Name,
ResourceVersion: namespace.ResourceVersion,
},
Spec: api.NamespaceSpec{},
}
finalizerSet := util.NewStringSet()
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes {
finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
}
}
namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, len(finalizerSet), len(finalizerSet))
for _, value := range finalizerSet.List() {
namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value))
}
return kubeClient.Namespaces().Finalize(&namespaceFinalize)
}
// deleteAllContent will delete all content known to the system in a namespace
func deleteAllContent(kubeClient client.Interface, namespace string) (err error) {
err = deleteServices(kubeClient, namespace)
if err != nil {
return err
}
err = deleteReplicationControllers(kubeClient, namespace)
if err != nil {
return err
}
err = deletePods(kubeClient, namespace)
if err != nil {
return err
}
err = deleteSecrets(kubeClient, namespace)
if err != nil {
return err
}
err = deleteLimitRanges(kubeClient, namespace)
if err != nil {
return err
}
err = deleteResourceQuotas(kubeClient, namespace)
if err != nil {
return err
}
err = deleteEvents(kubeClient, namespace)
if err != nil {
return err
}
return nil
}
// syncNamespace makes namespace life-cycle decisions
func (nm *NamespaceManager) syncNamespace(namespace api.Namespace) (err error) {
if namespace.DeletionTimestamp == nil {
return nil
}
// if there is a deletion timestamp, and the status is not terminating, then update status
if !namespace.DeletionTimestamp.IsZero() && namespace.Status.Phase != api.NamespaceTerminating {
newNamespace := api.Namespace{}
newNamespace.ObjectMeta = namespace.ObjectMeta
newNamespace.Status = namespace.Status
newNamespace.Status.Phase = api.NamespaceTerminating
result, err := nm.kubeClient.Namespaces().Status(&newNamespace)
if err != nil {
return err
}
// work with the latest copy so we can proceed to clean up right away without another interval
namespace = *result
}
// if the namespace is already finalized, delete it
if finalized(namespace) {
err = nm.kubeClient.Namespaces().Delete(namespace.Name)
return err
}
// there may still be content for us to remove
err = deleteAllContent(nm.kubeClient, namespace.Name)
if err != nil {
return err
}
// we have removed content, so mark it finalized by us
result, err := finalize(nm.kubeClient, namespace)
if err != nil {
return err
}
// now check if all finalizers have reported that we delete now
if finalized(*result) {
err = nm.kubeClient.Namespaces().Delete(namespace.Name)
return err
}
return nil
}
func deleteLimitRanges(kubeClient client.Interface, ns string) error {
items, err := kubeClient.LimitRanges(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.LimitRanges(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteResourceQuotas(kubeClient client.Interface, ns string) error {
resourceQuotas, err := kubeClient.ResourceQuotas(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range resourceQuotas.Items {
err := kubeClient.ResourceQuotas(ns).Delete(resourceQuotas.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteServices(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Services(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Services(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteReplicationControllers(kubeClient client.Interface, ns string) error {
items, err := kubeClient.ReplicationControllers(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.ReplicationControllers(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deletePods(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Pods(ns).List(labels.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Pods(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteEvents(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Events(ns).List(labels.Everything(), fields.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Events(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}
func deleteSecrets(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Secrets(ns).List(labels.Everything(), fields.Everything())
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Secrets(ns).Delete(items.Items[i].Name)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,129 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestFinalized(t *testing.T) {
testNamespace := api.Namespace{
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"a", "b"},
},
}
if finalized(testNamespace) {
t.Errorf("Unexpected result, namespace is not finalized")
}
testNamespace.Spec.Finalizers = []api.FinalizerName{}
if !finalized(testNamespace) {
t.Errorf("Expected object to be finalized")
}
}
func TestFinalize(t *testing.T) {
mockClient := &client.Fake{}
testNamespace := api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "test",
ResourceVersion: "1",
},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"kubernetes", "other"},
},
}
finalize(mockClient, testNamespace)
if len(mockClient.Actions) != 1 {
t.Errorf("Expected 1 mock client action, but got %v", len(mockClient.Actions))
}
if mockClient.Actions[0].Action != "finalize-namespace" {
t.Errorf("Expected finalize-namespace action %v", mockClient.Actions[0].Action)
}
}
func TestSyncNamespaceThatIsTerminating(t *testing.T) {
mockClient := &client.Fake{}
nm := NamespaceManager{kubeClient: mockClient, store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
now := util.Now()
testNamespace := api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "test",
ResourceVersion: "1",
DeletionTimestamp: &now,
},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"kubernetes"},
},
Status: api.NamespaceStatus{
Phase: api.NamespaceTerminating,
},
}
err := nm.syncNamespace(testNamespace)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)
}
expectedActionSet := util.NewStringSet(
"list-services",
"list-pods",
"list-resourceQuotas",
"list-controllers",
"list-secrets",
"list-limitRanges",
"list-events",
"finalize-namespace",
"delete-namespace")
actionSet := util.NewStringSet()
for i := range mockClient.Actions {
actionSet.Insert(mockClient.Actions[i].Action)
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions: %v, but got: %v", expectedActionSet, actionSet)
}
}
func TestSyncNamespaceThatIsActive(t *testing.T) {
mockClient := &client.Fake{}
nm := NamespaceManager{kubeClient: mockClient, store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
testNamespace := api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "test",
ResourceVersion: "1",
},
Spec: api.NamespaceSpec{
Finalizers: []api.FinalizerName{"kubernetes"},
},
Status: api.NamespaceStatus{
Phase: api.NamespaceActive,
},
}
err := nm.syncNamespace(testNamespace)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)
}
actionSet := util.NewStringSet()
for i := range mockClient.Actions {
actionSet.Insert(mockClient.Actions[i].Action)
}
if len(actionSet) != 0 {
t.Errorf("Expected no action from controller, but got: %v", actionSet)
}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package etcd
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -25,15 +27,27 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// rest implements a RESTStorage for namespaces against etcd
type REST struct {
*etcdgeneric.Etcd
status *etcdgeneric.Etcd
}
// StatusREST implements the REST endpoint for changing the status of a namespace.
type StatusREST struct {
store *etcdgeneric.Etcd
}
// FinalizeREST implements the REST endpoint for finalizing a namespace.
type FinalizeREST struct {
store *etcdgeneric.Etcd
}
// NewStorage returns a RESTStorage object that will work against namespaces
func NewStorage(h tools.EtcdHelper) *REST {
func NewStorage(h tools.EtcdHelper) (*REST, *StatusREST, *FinalizeREST) {
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Namespace{} },
NewListFunc: func() runtime.Object { return &api.NamespaceList{} },
@ -56,5 +70,54 @@ func NewStorage(h tools.EtcdHelper) *REST {
store.UpdateStrategy = namespace.Strategy
store.ReturnDeletedObject = true
return &REST{Etcd: store}
statusStore := *store
statusStore.UpdateStrategy = namespace.StatusStrategy
finalizeStore := *store
finalizeStore.UpdateStrategy = namespace.FinalizeStrategy
return &REST{Etcd: store, status: &statusStore}, &StatusREST{store: &statusStore}, &FinalizeREST{store: &finalizeStore}
}
// Delete enforces life-cycle rules for namespace termination
func (r *REST) Delete(ctx api.Context, name string, options *api.DeleteOptions) (runtime.Object, error) {
nsObj, err := r.Get(ctx, name)
if err != nil {
return nil, err
}
namespace := nsObj.(*api.Namespace)
// upon first request to delete, we switch the phase to start namespace termination
if namespace.DeletionTimestamp == nil {
now := util.Now()
namespace.DeletionTimestamp = &now
namespace.Status.Phase = api.NamespaceTerminating
result, _, err := r.status.Update(ctx, namespace)
return result, err
}
// prior to final deletion, we must ensure that finalizers is empty
if len(namespace.Spec.Finalizers) != 0 {
err = fmt.Errorf("Unable to delete namespace %v because finalizers is not empty %v", namespace.Name, namespace.Spec.Finalizers)
}
return r.Etcd.Delete(ctx, name, nil)
}
func (r *StatusREST) New() runtime.Object {
return r.store.New()
}
// Update alters the status subset of an object.
func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
return r.store.Update(ctx, obj)
}
func (r *FinalizeREST) New() runtime.Object {
return r.store.New()
}
// Update alters the status finalizers subset of an object.
func (r *FinalizeREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
return r.store.Update(ctx, obj)
}

View File

@ -41,7 +41,7 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient, h := newHelper(t)
storage := NewStorage(h)
storage, _, _ := NewStorage(h)
return storage, fakeEtcdClient, h
}
@ -69,7 +69,7 @@ func TestStorage(t *testing.T) {
func TestCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage := NewStorage(helper)
storage, _, _ := NewStorage(helper)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
namespace := validNewNamespace()
namespace.ObjectMeta = api.ObjectMeta{}
@ -94,7 +94,7 @@ func expectNamespace(t *testing.T, out runtime.Object) (*api.Namespace, bool) {
func TestCreateSetsFields(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage := NewStorage(helper)
storage, _, _ := NewStorage(helper)
namespace := validNewNamespace()
_, err := storage.Create(api.NewDefaultContext(), namespace)
if err != fakeEtcdClient.Err {
@ -124,7 +124,7 @@ func TestListEmptyNamespaceList(t *testing.T) {
E: fakeEtcdClient.NewError(tools.EtcdErrorCodeNotFound),
}
storage := NewStorage(helper)
storage, _, _ := NewStorage(helper)
namespaces, err := storage.List(api.NewContext(), labels.Everything(), fields.Everything())
if err != nil {
t.Errorf("Unexpected error: %v", err)
@ -157,7 +157,7 @@ func TestListNamespaceList(t *testing.T) {
},
},
}
storage := NewStorage(helper)
storage, _, _ := NewStorage(helper)
namespacesObj, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
namespaces := namespacesObj.(*api.NamespaceList)
if err != nil {
@ -204,7 +204,7 @@ func TestListNamespaceListSelection(t *testing.T) {
},
},
}
storage := NewStorage(helper)
storage, _, _ := NewStorage(helper)
ctx := api.NewDefaultContext()
table := []struct {
label, field string
@ -252,9 +252,11 @@ func TestListNamespaceListSelection(t *testing.T) {
}
func TestNamespaceDecode(t *testing.T) {
storage := NewStorage(tools.EtcdHelper{})
_, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
expected := validNewNamespace()
expected.Status.Phase = api.NamespaceActive
expected.Spec.Finalizers = []api.FinalizerName{api.FinalizerKubernetes}
body, err := latest.Codec.Encode(expected)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -281,7 +283,7 @@ func TestGet(t *testing.T) {
},
},
}
storage := NewStorage(helper)
storage, _, _ := NewStorage(helper)
obj, err := storage.Get(api.NewContext(), "foo")
namespace := obj.(*api.Namespace)
if err != nil {
@ -311,8 +313,9 @@ func TestDeleteNamespace(t *testing.T) {
},
},
}
storage := NewStorage(helper)
storage, _, _ := NewStorage(helper)
_, err := storage.Delete(api.NewDefaultContext(), "foo", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -45,10 +45,27 @@ func (namespaceStrategy) NamespaceScoped() bool {
// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation.
func (namespaceStrategy) ResetBeforeCreate(obj runtime.Object) {
// on create, status is active
namespace := obj.(*api.Namespace)
namespace.Status = api.NamespaceStatus{
Phase: api.NamespaceActive,
}
// on create, we require the kubernetes value
// we cannot use this in defaults conversion because we let it get removed over life of object
hasKubeFinalizer := false
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] == api.FinalizerKubernetes {
hasKubeFinalizer = true
break
}
}
if !hasKubeFinalizer {
if len(namespace.Spec.Finalizers) == 0 {
namespace.Spec.Finalizers = []api.FinalizerName{api.FinalizerKubernetes}
} else {
namespace.Spec.Finalizers = append(namespace.Spec.Finalizers, api.FinalizerKubernetes)
}
}
}
// Validate validates a new namespace.
@ -74,10 +91,19 @@ type namespaceStatusStrategy struct {
var StatusStrategy = namespaceStatusStrategy{Strategy}
func (namespaceStatusStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.ValidationErrorList {
// TODO: merge valid fields after update
return validation.ValidateNamespaceStatusUpdate(obj.(*api.Namespace), old.(*api.Namespace))
}
type namespaceFinalizeStrategy struct {
namespaceStrategy
}
var FinalizeStrategy = namespaceFinalizeStrategy{Strategy}
func (namespaceFinalizeStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.ValidationErrorList {
return validation.ValidateNamespaceFinalizeUpdate(obj.(*api.Namespace), old.(*api.Namespace))
}
// MatchNamespace returns a generic matcher for a given label and field selector.
func MatchNamespace(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {

View File

@ -87,7 +87,7 @@ func NewProvision(c client.Interface) admission.Interface {
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.Namespaces().List(labels.Everything())
return c.Namespaces().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion)

View File

@ -86,7 +86,7 @@ func NewExists(c client.Interface) admission.Interface {
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.Namespaces().List(labels.Everything())
return c.Namespaces().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion)

View File

@ -91,7 +91,7 @@ func NewLifecycle(c client.Interface) admission.Interface {
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.Namespaces().List(labels.Everything())
return c.Namespaces().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion)