diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index fb6d54355f9..0bf01f8aace 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -298,7 +298,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig // Find the list of namespaced resources via discovery that the namespace controller must manage namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")) namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc) - groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery()) + groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources() if err != nil { glog.Fatalf("Failed to get supported resources from server: %v", err) } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index ea7cc158f33..d2dc45ce4e1 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -220,7 +220,7 @@ func (s *CMServer) Run(_ []string) error { // Find the list of namespaced resources via discovery that the namespace controller must manage namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")) namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc) - groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery()) + groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources() if err != nil { glog.Fatalf("Failed to get supported resources from server: %v", err) } diff --git a/hack/test-integration.sh b/hack/test-integration.sh index 304e7d82e7f..0b9844cfe5e 100755 --- a/hack/test-integration.sh +++ b/hack/test-integration.sh @@ -32,7 +32,9 @@ source "${KUBE_ROOT}/hack/lib/init.sh" KUBE_TEST_API_VERSIONS=${KUBE_TEST_API_VERSIONS:-"v1,extensions/v1beta1;v1,autoscaling/v1,batch/v1,apps/v1alpha1,policy/v1alpha1,extensions/v1beta1"} # Give integration tests longer to run -KUBE_TIMEOUT=${KUBE_TIMEOUT:--timeout 240s} +# TODO: allow a larger value to be passed in +#KUBE_TIMEOUT=${KUBE_TIMEOUT:--timeout 240s} +KUBE_TIMEOUT="-timeout 1200s" KUBE_INTEGRATION_TEST_MAX_CONCURRENCY=${KUBE_INTEGRATION_TEST_MAX_CONCURRENCY:-"-1"} LOG_LEVEL=${LOG_LEVEL:-2} diff --git a/pkg/api/meta.go b/pkg/api/meta.go index d5590f49d24..d0cddcadb07 100644 --- a/pkg/api/meta.go +++ b/pkg/api/meta.go @@ -17,6 +17,7 @@ limitations under the License. package api import ( + "k8s.io/kubernetes/pkg/api/meta/metatypes" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" @@ -89,3 +90,25 @@ func (meta *ObjectMeta) GetLabels() map[string]string { return m func (meta *ObjectMeta) SetLabels(labels map[string]string) { meta.Labels = labels } func (meta *ObjectMeta) GetAnnotations() map[string]string { return meta.Annotations } func (meta *ObjectMeta) SetAnnotations(annotations map[string]string) { meta.Annotations = annotations } + +func (meta *ObjectMeta) GetOwnerReferences() []metatypes.OwnerReference { + ret := make([]metatypes.OwnerReference, len(meta.OwnerReferences)) + for i := 0; i < len(meta.OwnerReferences); i++ { + ret[i].Kind = meta.OwnerReferences[i].Kind + ret[i].Name = meta.OwnerReferences[i].Name + ret[i].UID = meta.OwnerReferences[i].UID + ret[i].APIVersion = meta.OwnerReferences[i].APIVersion + } + return ret +} + +func (meta *ObjectMeta) SetOwnerReferences(references []metatypes.OwnerReference) { + newReferences := make([]OwnerReference, len(references)) + for i := 0; i < len(references); i++ { + newReferences[i].Kind = references[i].Kind + newReferences[i].Name = references[i].Name + newReferences[i].UID = references[i].UID + newReferences[i].APIVersion = references[i].APIVersion + } + meta.OwnerReferences = newReferences +} diff --git a/pkg/api/meta/interfaces.go b/pkg/api/meta/interfaces.go index ffebcf34362..36d41e8bda7 100644 --- a/pkg/api/meta/interfaces.go +++ b/pkg/api/meta/interfaces.go @@ -17,6 +17,7 @@ limitations under the License. package meta import ( + "k8s.io/kubernetes/pkg/api/meta/metatypes" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" @@ -57,6 +58,14 @@ type Object interface { SetLabels(labels map[string]string) GetAnnotations() map[string]string SetAnnotations(annotations map[string]string) + GetOwnerReferences() []metatypes.OwnerReference + SetOwnerReferences([]metatypes.OwnerReference) +} + +var _ Object = &runtime.Unstructured{} + +type ListMetaAccessor interface { + GetListMeta() List } // List lets you work with list metadata from any of the versioned or @@ -177,5 +186,3 @@ type RESTMapper interface { AliasesForResource(resource string) ([]string, bool) ResourceSingularizer(resource string) (singular string, err error) } - -var _ Object = &runtime.Unstructured{} diff --git a/pkg/api/meta/meta.go b/pkg/api/meta/meta.go index 4001d6969bd..37ba93bdd92 100644 --- a/pkg/api/meta/meta.go +++ b/pkg/api/meta/meta.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" + "k8s.io/kubernetes/pkg/api/meta/metatypes" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" @@ -28,19 +29,53 @@ import ( "github.com/golang/glog" ) +func ListAccessor(obj interface{}) (List, error) { + if listMetaAccessor, ok := obj.(ListMetaAccessor); ok { + if om := listMetaAccessor.GetListMeta(); om != nil { + return om, nil + } + } + // we may get passed an object that is directly portable to List + if list, ok := obj.(List); ok { + return list, nil + } + glog.V(4).Infof("Calling ListAccessor on non-internal object: %v", reflect.TypeOf(obj)) + // legacy path for objects that do not implement List and ListMetaAccessor via + // reflection - very slow code path. + v, err := conversion.EnforcePtr(obj) + if err != nil { + return nil, err + } + t := v.Type() + if v.Kind() != reflect.Struct { + return nil, fmt.Errorf("expected struct, but got %v: %v (%#v)", v.Kind(), t, v.Interface()) + } + a := &genericAccessor{} + listMeta := v.FieldByName("ListMeta") + if listMeta.IsValid() { + // look for the ListMeta fields + if err := extractFromListMeta(listMeta, a); err != nil { + return nil, fmt.Errorf("unable to find list fields on %#v: %v", listMeta, err) + } + } else { + return nil, fmt.Errorf("unable to find listMeta on %#v", v) + } + return a, nil +} + // Accessor takes an arbitrary object pointer and returns meta.Interface. // obj must be a pointer to an API type. An error is returned if the minimum // required fields are missing. Fields that are not required return the default // value and are a no-op if set. func Accessor(obj interface{}) (Object, error) { - if oi, ok := obj.(ObjectMetaAccessor); ok { - if om := oi.GetObjectMeta(); om != nil { + if objectMetaAccessor, ok := obj.(ObjectMetaAccessor); ok { + if om := objectMetaAccessor.GetObjectMeta(); om != nil { return om, nil } } // we may get passed an object that is directly portable to Object - if oi, ok := obj.(Object); ok { - return oi, nil + if object, ok := obj.(Object); ok { + return object, nil } glog.V(4).Infof("Calling Accessor on non-internal object: %v", reflect.TypeOf(obj)) @@ -310,6 +345,40 @@ func (resourceAccessor) SetResourceVersion(obj runtime.Object, version string) e return nil } +// extractFromOwnerReference extracts v to o. v is the OwnerReferences field of an object. +func extractFromOwnerReference(v reflect.Value, o *metatypes.OwnerReference) error { + if err := runtime.Field(v, "APIVersion", &o.APIVersion); err != nil { + return err + } + if err := runtime.Field(v, "Kind", &o.Kind); err != nil { + return err + } + if err := runtime.Field(v, "Name", &o.Name); err != nil { + return err + } + if err := runtime.Field(v, "UID", &o.UID); err != nil { + return err + } + return nil +} + +// setOwnerReference sets v to o. v is the OwnerReferences field of an object. +func setOwnerReference(v reflect.Value, o *metatypes.OwnerReference) error { + if err := runtime.SetField(o.APIVersion, v, "APIVersion"); err != nil { + return err + } + if err := runtime.SetField(o.Kind, v, "Kind"); err != nil { + return err + } + if err := runtime.SetField(o.Name, v, "Name"); err != nil { + return err + } + if err := runtime.SetField(o.UID, v, "UID"); err != nil { + return err + } + return nil +} + // genericAccessor contains pointers to strings that can modify an arbitrary // struct and implements the Accessor interface. type genericAccessor struct { @@ -325,6 +394,7 @@ type genericAccessor struct { deletionTimestamp **unversioned.Time labels *map[string]string annotations *map[string]string + ownerReferences reflect.Value } func (a genericAccessor) GetNamespace() string { @@ -457,6 +527,41 @@ func (a genericAccessor) SetAnnotations(annotations map[string]string) { *a.annotations = annotations } +func (a genericAccessor) GetOwnerReferences() []metatypes.OwnerReference { + var ret []metatypes.OwnerReference + s := a.ownerReferences + if s.Kind() != reflect.Ptr || s.Elem().Kind() != reflect.Slice { + glog.Errorf("expect %v to be a pointer to slice", s) + return ret + } + s = s.Elem() + // Set the capacity to one element greater to avoid copy if the caller later append an element. + ret = make([]metatypes.OwnerReference, s.Len(), s.Len()+1) + for i := 0; i < s.Len(); i++ { + if err := extractFromOwnerReference(s.Index(i), &ret[i]); err != nil { + glog.Errorf("extractFromOwnerReference failed: %v", err) + return ret + } + } + return ret +} + +func (a genericAccessor) SetOwnerReferences(references []metatypes.OwnerReference) { + s := a.ownerReferences + if s.Kind() != reflect.Ptr || s.Elem().Kind() != reflect.Slice { + glog.Errorf("expect %v to be a pointer to slice", s) + } + s = s.Elem() + newReferences := reflect.MakeSlice(s.Type(), len(references), len(references)) + for i := 0; i < len(references); i++ { + if err := setOwnerReference(newReferences.Index(i), &references[i]); err != nil { + glog.Errorf("setOwnerReference failed: %v", err) + return + } + } + s.Set(newReferences) +} + // extractFromTypeMeta extracts pointers to version and kind fields from an object func extractFromTypeMeta(v reflect.Value, a *genericAccessor) error { if err := runtime.FieldPtr(v, "APIVersion", &a.apiVersion); err != nil { @@ -494,6 +599,14 @@ func extractFromObjectMeta(v reflect.Value, a *genericAccessor) error { if err := runtime.FieldPtr(v, "Annotations", &a.annotations); err != nil { return err } + ownerReferences := v.FieldByName("OwnerReferences") + if !ownerReferences.IsValid() { + return fmt.Errorf("struct %#v lacks OwnerReferences type", v) + } + if ownerReferences.Kind() != reflect.Slice { + return fmt.Errorf("expect %v to be a slice", ownerReferences.Kind()) + } + a.ownerReferences = ownerReferences.Addr() return nil } diff --git a/pkg/api/meta/meta_test.go b/pkg/api/meta/meta_test.go index 052812c6fa5..83daec0845c 100644 --- a/pkg/api/meta/meta_test.go +++ b/pkg/api/meta/meta_test.go @@ -20,9 +20,13 @@ import ( "reflect" "testing" + "github.com/google/gofuzz" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/api/meta/metatypes" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" ) @@ -127,17 +131,18 @@ func TestAPIObjectMeta(t *testing.T) { func TestGenericTypeMeta(t *testing.T) { type TypeMeta struct { - Kind string `json:"kind,omitempty"` - Namespace string `json:"namespace,omitempty"` - Name string `json:"name,omitempty"` - GenerateName string `json:"generateName,omitempty"` - UID string `json:"uid,omitempty"` - CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"` - SelfLink string `json:"selfLink,omitempty"` - ResourceVersion string `json:"resourceVersion,omitempty"` - APIVersion string `json:"apiVersion,omitempty"` - Labels map[string]string `json:"labels,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` + Kind string `json:"kind,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + GenerateName string `json:"generateName,omitempty"` + UID string `json:"uid,omitempty"` + CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"` + SelfLink string `json:"selfLink,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` + APIVersion string `json:"apiVersion,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"` } type Object struct { TypeMeta `json:",inline"` @@ -236,18 +241,20 @@ func TestGenericTypeMeta(t *testing.T) { } type InternalTypeMeta struct { - Kind string `json:"kind,omitempty"` - Namespace string `json:"namespace,omitempty"` - Name string `json:"name,omitempty"` - GenerateName string `json:"generateName,omitempty"` - UID string `json:"uid,omitempty"` - CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"` - SelfLink string `json:"selfLink,omitempty"` - ResourceVersion string `json:"resourceVersion,omitempty"` - APIVersion string `json:"apiVersion,omitempty"` - Labels map[string]string `json:"labels,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` + Kind string `json:"kind,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + GenerateName string `json:"generateName,omitempty"` + UID string `json:"uid,omitempty"` + CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"` + SelfLink string `json:"selfLink,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` + APIVersion string `json:"apiVersion,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"` } + type InternalObject struct { TypeMeta InternalTypeMeta `json:",inline"` } @@ -273,6 +280,7 @@ func TestGenericTypeMetaAccessor(t *testing.T) { SelfLink: "some/place/only/we/know", Labels: map[string]string{"foo": "bar"}, Annotations: map[string]string{"x": "y"}, + // OwnerReferences are tested separately }, } accessor := meta.NewAccessor() @@ -418,15 +426,16 @@ func TestGenericObjectMeta(t *testing.T) { APIVersion string `json:"apiVersion,omitempty"` } type ObjectMeta struct { - Namespace string `json:"namespace,omitempty"` - Name string `json:"name,omitempty"` - GenerateName string `json:"generateName,omitempty"` - UID string `json:"uid,omitempty"` - CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"` - SelfLink string `json:"selfLink,omitempty"` - ResourceVersion string `json:"resourceVersion,omitempty"` - Labels map[string]string `json:"labels,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + GenerateName string `json:"generateName,omitempty"` + UID string `json:"uid,omitempty"` + CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"` + SelfLink string `json:"selfLink,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"` } type Object struct { TypeMeta `json:",inline"` @@ -722,6 +731,66 @@ func TestTypeMetaSelfLinker(t *testing.T) { } } +type MyAPIObject2 struct { + unversioned.TypeMeta + v1.ObjectMeta +} + +func getObjectMetaAndOwnerRefereneces() (myAPIObject2 MyAPIObject2, metaOwnerReferences []metatypes.OwnerReference) { + fuzz.New().NilChance(.5).NumElements(1, 5).Fuzz(&myAPIObject2) + references := myAPIObject2.ObjectMeta.OwnerReferences + // This is necessary for the test to pass because the getter will return a + // non-nil slice. + metaOwnerReferences = make([]metatypes.OwnerReference, 0) + for i := 0; i < len(references); i++ { + metaOwnerReferences = append(metaOwnerReferences, metatypes.OwnerReference{ + Kind: references[i].Kind, + Name: references[i].Name, + UID: references[i].UID, + APIVersion: references[i].APIVersion, + }) + } + if len(references) == 0 { + // This is necessary for the test to pass because the setter will make a + // non-nil slice. + myAPIObject2.ObjectMeta.OwnerReferences = make([]v1.OwnerReference, 0) + } + return myAPIObject2, metaOwnerReferences +} + +func testGetOwnerReferences(t *testing.T) { + obj, expected := getObjectMetaAndOwnerRefereneces() + accessor, err := meta.Accessor(&obj) + if err != nil { + t.Error(err) + } + references := accessor.GetOwnerReferences() + if !reflect.DeepEqual(references, expected) { + t.Errorf("expect %#v\n got %#v", expected, references) + } +} + +func testSetOwnerReferences(t *testing.T) { + expected, references := getObjectMetaAndOwnerRefereneces() + obj := MyAPIObject2{} + accessor, err := meta.Accessor(&obj) + if err != nil { + t.Error(err) + } + accessor.SetOwnerReferences(references) + if e, a := expected.ObjectMeta.OwnerReferences, obj.ObjectMeta.OwnerReferences; !reflect.DeepEqual(e, a) { + t.Errorf("expect %#v\n got %#v", e, a) + } +} + +func TestAccessOwnerReferences(t *testing.T) { + fuzzIter := 5 + for i := 0; i < fuzzIter; i++ { + testGetOwnerReferences(t) + testSetOwnerReferences(t) + } +} + // BenchmarkAccessorSetFastPath shows the interface fast path func BenchmarkAccessorSetFastPath(b *testing.B) { obj := &api.Pod{ diff --git a/pkg/api/meta/metatypes/deep_copy_generated.go b/pkg/api/meta/metatypes/deep_copy_generated.go new file mode 100644 index 00000000000..f06a194e54a --- /dev/null +++ b/pkg/api/meta/metatypes/deep_copy_generated.go @@ -0,0 +1,33 @@ +// +build !ignore_autogenerated + +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was autogenerated by deepcopy-gen. Do not edit it manually! + +package metatypes + +import ( + conversion "k8s.io/kubernetes/pkg/conversion" +) + +func DeepCopy_metatypes_OwnerReference(in OwnerReference, out *OwnerReference, c *conversion.Cloner) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.UID = in.UID + out.Name = in.Name + return nil +} diff --git a/pkg/api/meta/metatypes/types.go b/pkg/api/meta/metatypes/types.go new file mode 100644 index 00000000000..ca4edf72e28 --- /dev/null +++ b/pkg/api/meta/metatypes/types.go @@ -0,0 +1,29 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The types defined in this package are used by the meta package to represent +// the in-memory version of objects. We cannot reuse the __internal version of +// API objects because it causes import cycle. +package metatypes + +import "k8s.io/kubernetes/pkg/types" + +type OwnerReference struct { + APIVersion string + Kind string + UID types.UID + Name string +} diff --git a/pkg/api/meta_test.go b/pkg/api/meta_test.go index e4a9657a0b5..cbac57e88ce 100644 --- a/pkg/api/meta_test.go +++ b/pkg/api/meta_test.go @@ -17,10 +17,14 @@ limitations under the License. package api_test import ( + "reflect" "testing" + "github.com/google/gofuzz" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/api/meta/metatypes" ) var _ meta.Object = &api.ObjectMeta{} @@ -49,3 +53,46 @@ func TestHasObjectMetaSystemFieldValues(t *testing.T) { t.Errorf("the resource does have all fields populated, but incorrectly reports it does not") } } + +func getObjectMetaAndOwnerReferences() (objectMeta api.ObjectMeta, metaOwnerReferences []metatypes.OwnerReference) { + fuzz.New().NilChance(.5).NumElements(1, 5).Fuzz(&objectMeta) + references := objectMeta.OwnerReferences + metaOwnerReferences = make([]metatypes.OwnerReference, 0) + for i := 0; i < len(references); i++ { + metaOwnerReferences = append(metaOwnerReferences, metatypes.OwnerReference{ + Kind: references[i].Kind, + Name: references[i].Name, + UID: references[i].UID, + APIVersion: references[i].APIVersion, + }) + } + if len(references) == 0 { + objectMeta.OwnerReferences = make([]api.OwnerReference, 0) + } + return objectMeta, metaOwnerReferences +} + +func testGetOwnerReferences(t *testing.T) { + meta, expected := getObjectMetaAndOwnerReferences() + refs := meta.GetOwnerReferences() + if !reflect.DeepEqual(refs, expected) { + t.Errorf("expect %v\n got %v", expected, refs) + } +} + +func testSetOwnerReferences(t *testing.T) { + expected, newRefs := getObjectMetaAndOwnerReferences() + objectMeta := &api.ObjectMeta{} + objectMeta.SetOwnerReferences(newRefs) + if !reflect.DeepEqual(expected.OwnerReferences, objectMeta.OwnerReferences) { + t.Errorf("expect: %#v\n got: %#v", expected.OwnerReferences, objectMeta.OwnerReferences) + } +} + +func TestAccessOwnerReferences(t *testing.T) { + fuzzIter := 5 + for i := 0; i < fuzzIter; i++ { + testGetOwnerReferences(t) + testSetOwnerReferences(t) + } +} diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 6f5658f9c76..e4d467534d7 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -292,11 +292,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) } - metaInterface, err := meta.Accessor(list) + listMetaInterface, err := meta.ListAccessor(list) if err != nil { - return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list) + return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } - resourceVersion = metaInterface.GetResourceVersion() + resourceVersion = listMetaInterface.GetResourceVersion() items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) diff --git a/pkg/client/typed/discovery/discovery_client.go b/pkg/client/typed/discovery/discovery_client.go index 4a0f0b91ac3..283dd5a63e8 100644 --- a/pkg/client/typed/discovery/discovery_client.go +++ b/pkg/client/typed/discovery/discovery_client.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "net/url" + "strings" "github.com/emicklei/go-restful/swagger" @@ -30,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/serializer" + utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/version" ) @@ -55,6 +57,12 @@ type ServerResourcesInterface interface { ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error) // ServerResources returns the supported resources for all groups and versions. ServerResources() (map[string]*unversioned.APIResourceList, error) + // ServerPreferredResources returns the supported resources with the version preferred by the + // server. + ServerPreferredResources() ([]unversioned.GroupVersionResource, error) + // ServerPreferredNamespacedResources returns the supported namespaced resources with the + // version preferred by the server. + ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) } // ServerVersionInterface has a method for retrieving the server's version. @@ -163,6 +171,50 @@ func (d *DiscoveryClient) ServerResources() (map[string]*unversioned.APIResource return result, nil } +// serverPreferredResources returns the supported resources with the version preferred by the +// server. If namespaced is true, only namespaced resources will be returned. +func (d *DiscoveryClient) serverPreferredResources(namespaced bool) ([]unversioned.GroupVersionResource, error) { + results := []unversioned.GroupVersionResource{} + serverGroupList, err := d.ServerGroups() + if err != nil { + return results, err + } + + allErrs := []error{} + for _, apiGroup := range serverGroupList.Groups { + preferredVersion := apiGroup.PreferredVersion + apiResourceList, err := d.ServerResourcesForGroupVersion(preferredVersion.GroupVersion) + if err != nil { + allErrs = append(allErrs, err) + continue + } + groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: preferredVersion.Version} + for _, apiResource := range apiResourceList.APIResources { + // ignore the root scoped resources if "namespaced" is true. + if namespaced && !apiResource.Namespaced { + continue + } + if strings.Contains(apiResource.Name, "/") { + continue + } + results = append(results, groupVersion.WithResource(apiResource.Name)) + } + } + return results, utilerrors.NewAggregate(allErrs) +} + +// ServerPreferredResources returns the supported resources with the version preferred by the +// server. +func (d *DiscoveryClient) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) { + return d.serverPreferredResources(false) +} + +// ServerPreferredNamespacedResources returns the supported namespaced resources with the +// version preferred by the server. +func (d *DiscoveryClient) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) { + return d.serverPreferredResources(true) +} + // ServerVersion retrieves and parses the server's version (git version). func (d *DiscoveryClient) ServerVersion() (*version.Info, error) { body, err := d.Get().AbsPath("/version").Do().Raw() diff --git a/pkg/client/typed/discovery/fake/discovery.go b/pkg/client/typed/discovery/fake/discovery.go index 0a2bb9af9d1..1c230acf02a 100644 --- a/pkg/client/typed/discovery/fake/discovery.go +++ b/pkg/client/typed/discovery/fake/discovery.go @@ -46,6 +46,14 @@ func (c *FakeDiscovery) ServerResources() (map[string]*unversioned.APIResourceLi return c.Resources, nil } +func (c *FakeDiscovery) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) { + return nil, nil +} + +func (c *FakeDiscovery) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) { + return nil, nil +} + func (c *FakeDiscovery) ServerGroups() (*unversioned.APIGroupList, error) { return nil, nil } diff --git a/pkg/client/unversioned/testclient/testclient.go b/pkg/client/unversioned/testclient/testclient.go index 0155da8a904..437e2c1ac57 100644 --- a/pkg/client/unversioned/testclient/testclient.go +++ b/pkg/client/unversioned/testclient/testclient.go @@ -382,6 +382,14 @@ type FakeDiscovery struct { *Fake } +func (c *FakeDiscovery) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) { + return nil, nil +} + +func (c *FakeDiscovery) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) { + return nil, nil +} + func (c *FakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error) { action := ActionImpl{ Verb: "get", diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go new file mode 100644 index 00000000000..8d8044f7280 --- /dev/null +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -0,0 +1,506 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/api/meta/metatypes" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/typed/dynamic" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +const ResourceResyncTime = 60 * time.Second + +type monitor struct { + store cache.Store + controller *framework.Controller +} + +type objectReference struct { + metatypes.OwnerReference + // This is needed by the dynamic client + Namespace string +} + +func (s objectReference) String() string { + return fmt.Sprintf("[%s/%s, namespace: %s, name: %s, uid: %s]", s.APIVersion, s.Kind, s.Namespace, s.Name, s.UID) +} + +// node does not require a lock to protect. The single-threaded +// Propagator.processEvent() is the sole writer of the nodes. The multi-threaded +// GarbageCollector.processItem() reads the nodes, but it only reads the fields +// that never get changed by Propagator.processEvent(). +type node struct { + identity objectReference + dependents map[*node]struct{} + // When processing an Update event, we need to compare the updated + // ownerReferences with the owners recorded in the graph. + owners []metatypes.OwnerReference +} + +type eventType int + +const ( + addEvent eventType = iota + updateEvent + deleteEvent +) + +type event struct { + eventType eventType + obj interface{} + // the update event comes with an old object, but it's not used by the garbage collector. + oldObj interface{} +} + +type Propagator struct { + eventQueue *workqueue.Type + // uidToNode doesn't require a lock to protect, because only the + // single-threaded Propagator.processEvent() reads/writes it. + uidToNode map[types.UID]*node + gc *GarbageCollector +} + +// addDependentToOwners adds n to owners' dependents list. If the owner does not +// exist in the p.uidToNode yet, a "virtual" node will be created to represent +// the owner. The "virtual" node will be enqueued to the dirtyQueue, so that +// processItem() will verify if the owner exists according to the API server. +func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerReference) { + for _, owner := range owners { + ownerNode, ok := p.uidToNode[owner.UID] + if !ok { + // Create a "virtual" node in the graph for the owner if it doesn't + // exist in the graph yet. Then enqueue the virtual node into the + // dirtyQueue. The garbage processor will enqueue a virtual delete + // event to delete it from the graph if API server confirms this + // owner doesn't exist. + ownerNode = &node{ + identity: objectReference{ + OwnerReference: owner, + Namespace: n.identity.Namespace, + }, + dependents: make(map[*node]struct{}), + } + p.uidToNode[ownerNode.identity.UID] = ownerNode + p.gc.dirtyQueue.Add(ownerNode) + } + ownerNode.dependents[n] = struct{}{} + } +} + +// insertNode insert the node to p.uidToNode; then it finds all owners as listed +// in n.owners, and adds the node to their dependents list. +func (p *Propagator) insertNode(n *node) { + p.uidToNode[n.identity.UID] = n + p.addDependentToOwners(n, n.owners) +} + +// removeDependentFromOwners remove n from owners' dependents list. +func (p *Propagator) removeDependentFromOwners(n *node, owners []metatypes.OwnerReference) { + for _, owner := range owners { + ownerNode, ok := p.uidToNode[owner.UID] + if !ok { + continue + } + delete(ownerNode.dependents, n) + } +} + +// removeNode removes the node from p.uidToNode, then finds all +// owners as listed in n.owners, and removes n from their dependents list. +func (p *Propagator) removeNode(n *node) { + delete(p.uidToNode, n.identity.UID) + p.removeDependentFromOwners(n, n.owners) +} + +// TODO: profile this function to see if a naive N^2 algorithm performs better +// when the number of references is small. +func referencesDiffs(old []metatypes.OwnerReference, new []metatypes.OwnerReference) (added []metatypes.OwnerReference, removed []metatypes.OwnerReference) { + oldUIDToRef := make(map[string]metatypes.OwnerReference) + for i := 0; i < len(old); i++ { + oldUIDToRef[string(old[i].UID)] = old[i] + } + oldUIDSet := sets.StringKeySet(oldUIDToRef) + newUIDToRef := make(map[string]metatypes.OwnerReference) + for i := 0; i < len(new); i++ { + newUIDToRef[string(new[i].UID)] = new[i] + } + newUIDSet := sets.StringKeySet(newUIDToRef) + + addedUID := newUIDSet.Difference(oldUIDSet) + removedUID := oldUIDSet.Difference(newUIDSet) + + for uid := range addedUID { + added = append(added, newUIDToRef[uid]) + } + for uid := range removedUID { + removed = append(removed, oldUIDToRef[uid]) + } + return added, removed +} + +// Dequeueing an event from eventQueue, updating graph, populating dirty_queue. +func (p *Propagator) processEvent() { + key, quit := p.eventQueue.Get() + if quit { + return + } + defer p.eventQueue.Done(key) + event, ok := key.(event) + if !ok { + utilruntime.HandleError(fmt.Errorf("expect an event, got %v", key)) + return + } + obj := event.obj + accessor, err := meta.Accessor(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) + return + } + typeAccessor, err := meta.TypeAccessor(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) + return + } + glog.V(6).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType) + // Check if the node already exsits + existingNode, found := p.uidToNode[accessor.GetUID()] + switch { + case (event.eventType == addEvent || event.eventType == updateEvent) && !found: + newNode := &node{ + identity: objectReference{ + OwnerReference: metatypes.OwnerReference{ + APIVersion: typeAccessor.GetAPIVersion(), + Kind: typeAccessor.GetKind(), + UID: accessor.GetUID(), + Name: accessor.GetName(), + }, + Namespace: accessor.GetNamespace(), + }, + dependents: make(map[*node]struct{}), + owners: accessor.GetOwnerReferences(), + } + p.insertNode(newNode) + case (event.eventType == addEvent || event.eventType == updateEvent) && found: + // TODO: finalizer: Check if ObjectMeta.DeletionTimestamp is updated from nil to non-nil + // We only need to add/remove owner refs for now + added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences()) + if len(added) == 0 && len(removed) == 0 { + glog.V(6).Infof("The updateEvent %#v doesn't change node references, ignore", event) + return + } + // update the node itself + existingNode.owners = accessor.GetOwnerReferences() + // Add the node to its new owners' dependent lists. + p.addDependentToOwners(existingNode, added) + // remove the node from the dependent list of node that are no long in + // the node's owners list. + p.removeDependentFromOwners(existingNode, removed) + case event.eventType == deleteEvent: + if !found { + glog.V(6).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID()) + return + } + p.removeNode(existingNode) + for dep := range existingNode.dependents { + p.gc.dirtyQueue.Add(dep) + } + } +} + +// GarbageCollector is responsible for carrying out cascading deletion, and +// removing ownerReferences from the dependents if the owner is deleted with +// DeleteOptions.OrphanDependents=true. +type GarbageCollector struct { + restMapper meta.RESTMapper + clientPool dynamic.ClientPool + dirtyQueue *workqueue.Type + monitors []monitor + propagator *Propagator +} + +func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource) (monitor, error) { + // TODO: consider store in one storage. + glog.V(6).Infof("create storage for resource %s", resource) + var monitor monitor + client, err := clientPool.ClientForGroupVersion(resource.GroupVersion()) + if err != nil { + return monitor, err + } + monitor.store, monitor.controller = framework.NewInformer( + // TODO: make special List and Watch function that removes fields other + // than ObjectMeta. + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + // APIResource.Kind is not used by the dynamic client, so + // leave it empty. We want to list this resource in all + // namespaces if it's namespace scoped, so leave + // APIResource.Namespaced as false is all right. + apiResource := unversioned.APIResource{Name: resource.Resource} + return client.Resource(&apiResource, api.NamespaceAll).List(&options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + // APIResource.Kind is not used by the dynamic client, so + // leave it empty. We want to list this resource in all + // namespaces if it's namespace scoped, so leave + // APIResource.Namespaced as false is all right. + apiResource := unversioned.APIResource{Name: resource.Resource} + return client.Resource(&apiResource, api.NamespaceAll).Watch(&options) + }, + }, + nil, + ResourceResyncTime, + framework.ResourceEventHandlerFuncs{ + // add the event to the propagator's eventQueue. + AddFunc: func(obj interface{}) { + event := event{ + eventType: addEvent, + obj: obj, + } + p.eventQueue.Add(event) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + event := event{updateEvent, newObj, oldObj} + p.eventQueue.Add(event) + }, + DeleteFunc: func(obj interface{}) { + // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it + if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = deletedFinalStateUnknown.Obj + } + event := event{ + eventType: deleteEvent, + obj: obj, + } + p.eventQueue.Add(event) + }, + }, + ) + return monitor, nil +} + +var ignoredResources = map[unversioned.GroupVersionResource]struct{}{ + unversioned.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicationcontrollers"}: {}, + unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "bindings"}: {}, + unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "componentstatuses"}: {}, + unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "events"}: {}, +} + +func NewGarbageCollector(clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) { + gc := &GarbageCollector{ + clientPool: clientPool, + dirtyQueue: workqueue.New(), + // TODO: should use a dynamic RESTMapper built from the discovery results. + restMapper: registered.RESTMapper(), + } + gc.propagator = &Propagator{ + eventQueue: workqueue.New(), + uidToNode: make(map[types.UID]*node), + gc: gc, + } + for _, resource := range resources { + if _, ok := ignoredResources[resource]; ok { + glog.V(6).Infof("ignore resource %#v", resource) + continue + } + monitor, err := monitorFor(gc.propagator, gc.clientPool, resource) + if err != nil { + return nil, err + } + gc.monitors = append(gc.monitors, monitor) + } + return gc, nil +} + +func (gc *GarbageCollector) worker() { + key, quit := gc.dirtyQueue.Get() + if quit { + return + } + defer gc.dirtyQueue.Done(key) + err := gc.processItem(key.(*node)) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Error syncing item %v: %v", key, err)) + } +} + +// apiResource consults the REST mapper to translate an tuple to a unversioned.APIResource struct. +func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool) (*unversioned.APIResource, error) { + fqKind := unversioned.FromAPIVersionAndKind(apiVersion, kind) + mapping, err := gc.restMapper.RESTMapping(fqKind.GroupKind(), apiVersion) + if err != nil { + return nil, fmt.Errorf("unable to get REST mapping for kind: %s, version: %s", kind, apiVersion) + } + glog.V(6).Infof("map kind %s, version %s to resource %s", kind, apiVersion, mapping.Resource) + resource := unversioned.APIResource{ + Name: mapping.Resource, + Namespaced: namespaced, + Kind: kind, + } + return &resource, nil +} + +func (gc *GarbageCollector) deleteObject(item objectReference) error { + fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) + client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) + resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + if err != nil { + return err + } + uid := item.UID + preconditions := v1.Preconditions{UID: &uid} + deleteOptions := v1.DeleteOptions{Preconditions: &preconditions} + return client.Resource(resource, item.Namespace).Delete(item.Name, &deleteOptions) +} + +func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) { + fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) + client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) + resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + if err != nil { + return nil, err + } + return client.Resource(resource, item.Namespace).Get(item.Name) +} + +func objectReferenceToUnstructured(ref objectReference) *runtime.Unstructured { + ret := &runtime.Unstructured{} + ret.SetKind(ref.Kind) + ret.SetAPIVersion(ref.APIVersion) + ret.SetUID(ref.UID) + ret.SetNamespace(ref.Namespace) + ret.SetName(ref.Name) + return ret +} + +func (gc *GarbageCollector) processItem(item *node) error { + // Get the latest item from the API server + latest, err := gc.getObject(item.identity) + if err != nil { + if errors.IsNotFound(err) { + // the Propagator can add "virtual" node for an owner that doesn't + // exist yet, so we need to enqueue a virtual Delete event to remove + // the virtual node from Propagator.uidToNode. + glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity) + event := event{ + eventType: deleteEvent, + obj: objectReferenceToUnstructured(item.identity), + } + gc.propagator.eventQueue.Add(event) + return nil + } + return err + } + if latest.GetUID() != item.identity.UID { + glog.V(6).Infof("UID doesn't match, item %v not found, ignore it", item.identity) + return nil + } + ownerReferences := latest.GetOwnerReferences() + if len(ownerReferences) == 0 { + glog.V(6).Infof("object %s's doesn't have an owner, continue on next item", item.identity) + return nil + } + // TODO: we need to remove dangling references if the object is not to be + // deleted. + for _, reference := range ownerReferences { + // TODO: we need to verify the reference resource is supported by the + // system. If it's not a valid resource, the garbage collector should i) + // ignore the reference when decide if the object should be deleted, and + // ii) should update the object to remove such references. This is to + // prevent objects having references to an old resource from being + // deleted during a cluster upgrade. + fqKind := unversioned.FromAPIVersionAndKind(reference.APIVersion, reference.Kind) + client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) + if err != nil { + return err + } + resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len(item.identity.Namespace) != 0) + if err != nil { + return err + } + owner, err := client.Resource(resource, item.identity.Namespace).Get(reference.Name) + if err == nil { + if owner.GetUID() != reference.UID { + glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + continue + } + glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID) + return nil + } else if errors.IsNotFound(err) { + glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + } else { + return err + } + } + glog.V(2).Infof("none of object %s's owners exist any more, will garbage collect it", item.identity) + return gc.deleteObject(item.identity) +} + +func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { + for _, monitor := range gc.monitors { + go monitor.controller.Run(stopCh) + } + + // worker + go wait.Until(gc.propagator.processEvent, 0, stopCh) + + for i := 0; i < workers; i++ { + go wait.Until(gc.worker, 0, stopCh) + } + <-stopCh + glog.Infof("Shutting down garbage collector") + gc.dirtyQueue.ShutDown() + gc.propagator.eventQueue.ShutDown() +} + +// QueueDrained returns if the dirtyQueue and eventQueue are drained. It's +// useful for debugging. +func (gc *GarbageCollector) QueuesDrained() bool { + return gc.dirtyQueue.Len() == 0 && gc.propagator.eventQueue.Len() == 0 +} + +// *FOR TEST USE ONLY* It's not safe to call this function when the GC is still +// busy. +// GraphHasUID returns if the Propagator has a particular UID store in its +// uidToNode graph. It's useful for debugging. +func (gc *GarbageCollector) GraphHasUID(UIDs []types.UID) bool { + for _, u := range UIDs { + if _, ok := gc.propagator.uidToNode[u]; ok { + return true + } + } + return false +} diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go new file mode 100644 index 00000000000..a1a935858c8 --- /dev/null +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -0,0 +1,287 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + + _ "k8s.io/kubernetes/pkg/api/install" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta/metatypes" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/dynamic" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/json" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/workqueue" +) + +func TestNewGarbageCollector(t *testing.T) { + clientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc) + podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}} + gc, err := NewGarbageCollector(clientPool, podResource) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 1, len(gc.monitors)) +} + +// fakeAction records information about requests to aid in testing. +type fakeAction struct { + method string + path string +} + +// String returns method=path to aid in testing +func (f *fakeAction) String() string { + return strings.Join([]string{f.method, f.path}, "=") +} + +type FakeResponse struct { + statusCode int + content []byte +} + +// fakeActionHandler holds a list of fakeActions received +type fakeActionHandler struct { + // statusCode and content returned by this handler for different method + path. + response map[string]FakeResponse + + lock sync.Mutex + actions []fakeAction +} + +// ServeHTTP logs the action that occurred and always returns the associated status code +func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { + f.lock.Lock() + defer f.lock.Unlock() + + f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path}) + fakeResponse, ok := f.response[request.Method+request.URL.Path] + if !ok { + fakeResponse.statusCode = 200 + fakeResponse.content = []byte("{\"kind\": \"List\"}") + } + response.WriteHeader(fakeResponse.statusCode) + response.Write(fakeResponse.content) +} + +// testServerAndClientConfig returns a server that listens and a config that can reference it +func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *restclient.Config) { + srv := httptest.NewServer(http.HandlerFunc(handler)) + config := &restclient.Config{ + Host: srv.URL, + } + return srv, config +} + +func newDanglingPod() *v1.Pod { + return &v1.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "ToBeDeletedPod", + Namespace: "ns1", + OwnerReferences: []v1.OwnerReference{ + { + Kind: "ReplicationController", + Name: "owner1", + UID: "123", + APIVersion: "v1", + }, + }, + }, + } +} + +// test the processItem function making the expected actions. +func TestProcessItem(t *testing.T) { + pod := newDanglingPod() + podBytes, err := json.Marshal(pod) + if err != nil { + t.Fatal(err) + } + testHandler := &fakeActionHandler{ + response: map[string]FakeResponse{ + "GET" + "/api/v1/namespaces/ns1/replicationcontrollers/owner1": { + 404, + []byte{}, + }, + "GET" + "/api/v1/namespaces/ns1/pods/ToBeDeletedPod": { + 200, + podBytes, + }, + }, + } + podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}} + srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) + defer srv.Close() + clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc) + gc, err := NewGarbageCollector(clientPool, podResource) + if err != nil { + t.Fatal(err) + } + item := &node{ + identity: objectReference{ + OwnerReference: metatypes.OwnerReference{ + Kind: pod.Kind, + APIVersion: pod.APIVersion, + Name: pod.Name, + UID: pod.UID, + }, + Namespace: pod.Namespace, + }, + // owners are intentionally left empty. The processItem routine should get the latest item from the server. + owners: nil, + } + err = gc.processItem(item) + if err != nil { + t.Errorf("Unexpected Error: %v", err) + } + expectedActionSet := sets.NewString() + expectedActionSet.Insert("GET=/api/v1/namespaces/ns1/replicationcontrollers/owner1") + expectedActionSet.Insert("DELETE=/api/v1/namespaces/ns1/pods/ToBeDeletedPod") + expectedActionSet.Insert("GET=/api/v1/namespaces/ns1/pods/ToBeDeletedPod") + + actualActionSet := sets.NewString() + for _, action := range testHandler.actions { + actualActionSet.Insert(action.String()) + } + if !expectedActionSet.Equal(actualActionSet) { + t.Errorf("expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, + actualActionSet, expectedActionSet.Difference(actualActionSet)) + } +} + +// verifyGraphInvariants verifies that all of a node's owners list the node as a +// dependent and vice versa. uidToNode has all the nodes in the graph. +func verifyGraphInvariants(scenario string, uidToNode map[types.UID]*node, t *testing.T) { + for myUID, node := range uidToNode { + for dependentNode := range node.dependents { + found := false + for _, owner := range dependentNode.owners { + if owner.UID == myUID { + found = true + break + } + } + if !found { + t.Errorf("scenario: %s: node %s has node %s as a dependent, but it's not present in the latter node's owners list", scenario, node.identity, dependentNode.identity) + } + } + + for _, owner := range node.owners { + ownerNode, ok := uidToNode[owner.UID] + if !ok { + // It's possible that the owner node doesn't exist + continue + } + if _, ok := ownerNode.dependents[node]; !ok { + t.Errorf("node %s has node %s as an owner, but it's not present in the latter node's dependents list", node.identity, ownerNode.identity) + } + } + } +} + +func createEvent(eventType eventType, selfUID string, owners []string) event { + var ownerReferences []api.OwnerReference + for i := 0; i < len(owners); i++ { + ownerReferences = append(ownerReferences, api.OwnerReference{UID: types.UID(owners[i])}) + } + return event{ + eventType: eventType, + obj: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(selfUID), + OwnerReferences: ownerReferences, + }, + }, + } +} + +func TestProcessEvent(t *testing.T) { + var testScenarios = []struct { + name string + // a series of events that will be supplied to the + // Propagator.eventQueue. + events []event + }{ + { + name: "test1", + events: []event{ + createEvent(addEvent, "1", []string{}), + createEvent(addEvent, "2", []string{"1"}), + createEvent(addEvent, "3", []string{"1", "2"}), + }, + }, + { + name: "test2", + events: []event{ + createEvent(addEvent, "1", []string{}), + createEvent(addEvent, "2", []string{"1"}), + createEvent(addEvent, "3", []string{"1", "2"}), + createEvent(addEvent, "4", []string{"2"}), + createEvent(deleteEvent, "2", []string{"doesn't matter"}), + }, + }, + { + name: "test3", + events: []event{ + createEvent(addEvent, "1", []string{}), + createEvent(addEvent, "2", []string{"1"}), + createEvent(addEvent, "3", []string{"1", "2"}), + createEvent(addEvent, "4", []string{"3"}), + createEvent(updateEvent, "2", []string{"4"}), + }, + }, + { + name: "reverse test2", + events: []event{ + createEvent(addEvent, "4", []string{"2"}), + createEvent(addEvent, "3", []string{"1", "2"}), + createEvent(addEvent, "2", []string{"1"}), + createEvent(addEvent, "1", []string{}), + createEvent(deleteEvent, "2", []string{"doesn't matter"}), + }, + }, + } + + for _, scenario := range testScenarios { + propagator := &Propagator{ + eventQueue: workqueue.New(), + uidToNode: make(map[types.UID]*node), + gc: &GarbageCollector{ + dirtyQueue: workqueue.New(), + }, + } + for i := 0; i < len(scenario.events); i++ { + propagator.eventQueue.Add(scenario.events[i]) + propagator.processEvent() + verifyGraphInvariants(scenario.name, propagator.uidToNode, t) + } + } +} diff --git a/pkg/controller/namespace/namespace_controller_utils.go b/pkg/controller/namespace/namespace_controller_utils.go index a48d4091219..9971e3ee5ef 100644 --- a/pkg/controller/namespace/namespace_controller_utils.go +++ b/pkg/controller/namespace/namespace_controller_utils.go @@ -18,7 +18,6 @@ package namespace import ( "fmt" - "strings" "time" "k8s.io/kubernetes/pkg/api" @@ -26,10 +25,8 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/client/typed/discovery" "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/runtime" - utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "github.com/golang/glog" @@ -452,33 +449,3 @@ func estimateGracefulTerminationForPods(kubeClient clientset.Interface, ns strin } return estimate, nil } - -// ServerPreferredNamespacedGroupVersionResources uses the specified client to discover the set of preferred groupVersionResources that are namespaced -func ServerPreferredNamespacedGroupVersionResources(discoveryClient discovery.DiscoveryInterface) ([]unversioned.GroupVersionResource, error) { - results := []unversioned.GroupVersionResource{} - serverGroupList, err := discoveryClient.ServerGroups() - if err != nil { - return results, err - } - - allErrs := []error{} - for _, apiGroup := range serverGroupList.Groups { - preferredVersion := apiGroup.PreferredVersion - apiResourceList, err := discoveryClient.ServerResourcesForGroupVersion(preferredVersion.GroupVersion) - if err != nil { - allErrs = append(allErrs, err) - continue - } - groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: preferredVersion.Version} - for _, apiResource := range apiResourceList.APIResources { - if !apiResource.Namespaced { - continue - } - if strings.Contains(apiResource.Name, "/") { - continue - } - results = append(results, groupVersion.WithResource(apiResource.Name)) - } - } - return results, utilerrors.NewAggregate(allErrs) -} diff --git a/pkg/runtime/helper.go b/pkg/runtime/helper.go index 4606ddea7cd..dca37b8f4eb 100644 --- a/pkg/runtime/helper.go +++ b/pkg/runtime/helper.go @@ -68,6 +68,47 @@ func UnsafeObjectConvertor(scheme *Scheme) ObjectConvertor { return unsafeObjectConvertor{scheme} } +// SetField puts the value of src, into fieldName, which must be a member of v. +// The value of src must be assignable to the field. +func SetField(src interface{}, v reflect.Value, fieldName string) error { + field := v.FieldByName(fieldName) + if !field.IsValid() { + return fmt.Errorf("couldn't find %v field in %#v", fieldName, v.Interface()) + } + srcValue := reflect.ValueOf(src) + if srcValue.Type().AssignableTo(field.Type()) { + field.Set(srcValue) + return nil + } + if srcValue.Type().ConvertibleTo(field.Type()) { + field.Set(srcValue.Convert(field.Type())) + return nil + } + return fmt.Errorf("couldn't assign/convert %v to %v", srcValue.Type(), field.Type()) +} + +// Field puts the value of fieldName, which must be a member of v, into dest, +// which must be a variable to which this field's value can be assigned. +func Field(v reflect.Value, fieldName string, dest interface{}) error { + field := v.FieldByName(fieldName) + if !field.IsValid() { + return fmt.Errorf("couldn't find %v field in %#v", fieldName, v.Interface()) + } + destValue, err := conversion.EnforcePtr(dest) + if err != nil { + return err + } + if field.Type().AssignableTo(destValue.Type()) { + destValue.Set(field) + return nil + } + if field.Type().ConvertibleTo(destValue.Type()) { + destValue.Set(field.Convert(destValue.Type())) + return nil + } + return fmt.Errorf("couldn't assign/convert %v to %v", field.Type(), destValue.Type()) +} + // fieldPtr puts the address of fieldName, which must be a member of v, // into dest, which must be an address of a variable to which this field's // address can be assigned. diff --git a/pkg/runtime/types.go b/pkg/runtime/types.go index 58b7843ffbc..d50f6428908 100644 --- a/pkg/runtime/types.go +++ b/pkg/runtime/types.go @@ -17,6 +17,11 @@ limitations under the License. package runtime import ( + "fmt" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api/meta/metatypes" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/types" ) @@ -196,6 +201,71 @@ func (u *Unstructured) setNestedMap(value map[string]string, fields ...string) { setNestedMap(u.Object, value, fields...) } +func extractOwnerReference(src interface{}) metatypes.OwnerReference { + v := src.(map[string]interface{}) + return metatypes.OwnerReference{ + Kind: getNestedString(v, "kind"), + Name: getNestedString(v, "name"), + APIVersion: getNestedString(v, "apiVersion"), + UID: (types.UID)(getNestedString(v, "uid")), + } +} + +func setOwnerReference(src metatypes.OwnerReference) map[string]interface{} { + ret := make(map[string]interface{}) + setNestedField(ret, src.Kind, "kind") + setNestedField(ret, src.Name, "name") + setNestedField(ret, src.APIVersion, "apiVersion") + setNestedField(ret, string(src.UID), "uid") + return ret +} + +func getOwnerReferences(object map[string]interface{}) ([]map[string]interface{}, error) { + field := getNestedField(object, "metadata", "ownerReferences") + if field == nil { + return nil, fmt.Errorf("cannot find field metadata.ownerReferences in %v", object) + } + ownerReferences, ok := field.([]map[string]interface{}) + if ok { + return ownerReferences, nil + } + // TODO: This is hacky... + interfaces, ok := field.([]interface{}) + if !ok { + return nil, fmt.Errorf("expect metadata.ownerReferences to be a slice in %#v", object) + } + ownerReferences = make([]map[string]interface{}, 0, len(interfaces)) + for i := 0; i < len(interfaces); i++ { + r, ok := interfaces[i].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("expect element metadata.ownerReferences to be a map[string]interface{} in %#v", object) + } + ownerReferences = append(ownerReferences, r) + } + return ownerReferences, nil +} + +func (u *Unstructured) GetOwnerReferences() []metatypes.OwnerReference { + original, err := getOwnerReferences(u.Object) + if err != nil { + glog.V(6).Info(err) + return nil + } + ret := make([]metatypes.OwnerReference, 0, len(original)) + for i := 0; i < len(original); i++ { + ret = append(ret, extractOwnerReference(original[i])) + } + return ret +} + +func (u *Unstructured) SetOwnerReferences(references []metatypes.OwnerReference) { + var newReferences = make([]map[string]interface{}, 0, len(references)) + for i := 0; i < len(references); i++ { + newReferences = append(newReferences, setOwnerReference(references[i])) + } + u.setNestedField(newReferences, "metadata", "ownerReferences") +} + func (u *Unstructured) GetAPIVersion() string { return getNestedString(u.Object, "apiVersion") } diff --git a/pkg/runtime/unstructured.go b/pkg/runtime/unstructured.go index 24fe1357ac3..694e2a56a7c 100644 --- a/pkg/runtime/unstructured.go +++ b/pkg/runtime/unstructured.go @@ -19,6 +19,7 @@ package runtime import ( gojson "encoding/json" "io" + "strings" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/util/json" @@ -128,6 +129,12 @@ func (s unstructuredJSONScheme) decodeToList(data []byte, list *UnstructuredList return err } + // For typed lists, e.g., a PodList, API server doesn't set each item's + // APIVersion and Kind. We need to set it. + listAPIVersion := list.GetAPIVersion() + listKind := list.GetKind() + itemKind := strings.TrimSuffix(listKind, "List") + delete(list.Object, "items") list.Items = nil for _, i := range dList.Items { @@ -135,6 +142,12 @@ func (s unstructuredJSONScheme) decodeToList(data []byte, list *UnstructuredList if err := s.decodeToUnstructured([]byte(i), unstruct); err != nil { return err } + // This is hacky. Set the item's Kind and APIVersion to those inferred + // from the List. + if len(unstruct.GetKind()) == 0 && len(unstruct.GetAPIVersion()) == 0 { + unstruct.SetKind(itemKind) + unstruct.SetAPIVersion(listAPIVersion) + } list.Items = append(list.Items, unstruct) } return nil diff --git a/pkg/runtime/unstructured_test.go b/pkg/runtime/unstructured_test.go index 155d534ff17..9cdd8b89342 100644 --- a/pkg/runtime/unstructured_test.go +++ b/pkg/runtime/unstructured_test.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta/metatypes" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/validation" @@ -141,6 +142,20 @@ func TestUnstructuredGetters(t *testing.T) { "annotations": map[string]interface{}{ "test_annotation": "test_value", }, + "ownerReferences": []map[string]interface{}{ + { + "kind": "Pod", + "name": "poda", + "apiVersion": "v1", + "uid": "1", + }, + { + "kind": "Pod", + "name": "podb", + "apiVersion": "v1", + "uid": "2", + }, + }, }, }, } @@ -192,6 +207,24 @@ func TestUnstructuredGetters(t *testing.T) { if got, want := unstruct.GetAnnotations(), map[string]string{"test_annotation": "test_value"}; !reflect.DeepEqual(got, want) { t.Errorf("GetAnnotations() = %s, want %s", got, want) } + refs := unstruct.GetOwnerReferences() + expectedOwnerReferences := []metatypes.OwnerReference{ + { + Kind: "Pod", + Name: "poda", + APIVersion: "v1", + UID: "1", + }, + { + Kind: "Pod", + Name: "podb", + APIVersion: "v1", + UID: "2", + }, + } + if got, want := refs, expectedOwnerReferences; !reflect.DeepEqual(got, want) { + t.Errorf("GetOwnerReference()=%v, want %v", got, want) + } } func TestUnstructuredSetters(t *testing.T) { @@ -216,6 +249,20 @@ func TestUnstructuredSetters(t *testing.T) { "annotations": map[string]interface{}{ "test_annotation": "test_value", }, + "ownerReferences": []map[string]interface{}{ + { + "kind": "Pod", + "name": "poda", + "apiVersion": "v1", + "uid": "1", + }, + { + "kind": "Pod", + "name": "podb", + "apiVersion": "v1", + "uid": "2", + }, + }, }, }, } @@ -233,9 +280,24 @@ func TestUnstructuredSetters(t *testing.T) { unstruct.SetDeletionTimestamp(&date) unstruct.SetLabels(map[string]string{"test_label": "test_value"}) unstruct.SetAnnotations(map[string]string{"test_annotation": "test_value"}) + newOwnerReferences := []metatypes.OwnerReference{ + { + Kind: "Pod", + Name: "poda", + APIVersion: "v1", + UID: "1", + }, + { + Kind: "Pod", + Name: "podb", + APIVersion: "v1", + UID: "2", + }, + } + unstruct.SetOwnerReferences(newOwnerReferences) if !reflect.DeepEqual(unstruct, want) { - t.Errorf("Wanted: \n%s\n Got:\n%s", unstruct, want) + t.Errorf("Wanted: \n%s\n Got:\n%s", want, unstruct) } } diff --git a/test/integration/garbage_collector_test.go b/test/integration/garbage_collector_test.go new file mode 100644 index 00000000000..cd2c87e19dc --- /dev/null +++ b/test/integration/garbage_collector_test.go @@ -0,0 +1,374 @@ +// +build integration,!no-etcd + +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strconv" + "sync" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/dynamic" + "k8s.io/kubernetes/pkg/controller/garbagecollector" + "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/test/integration/framework" +) + +const garbageCollectedPodName = "test.pod.1" +const independentPodName = "test.pod.2" +const oneValidOwnerPodName = "test.pod.3" +const toBeDeletedRCName = "test.rc.1" +const remainingRCName = "test.rc.2" + +func newPod(podName string, ownerReferences []v1.OwnerReference) *v1.Pod { + for i := 0; i < len(ownerReferences); i++ { + ownerReferences[i].Kind = "ReplicationController" + ownerReferences[i].APIVersion = "v1" + } + return &v1.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: podName, + Namespace: framework.TestNS, + OwnerReferences: ownerReferences, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + } +} + +func newOwnerRC(name string) *v1.ReplicationController { + return &v1.ReplicationController{ + TypeMeta: unversioned.TypeMeta{ + Kind: "ReplicationController", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: framework.TestNS, + Name: name, + }, + Spec: v1.ReplicationControllerSpec{ + Selector: map[string]string{"name": "test"}, + Template: &v1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{"name": "test"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + }, + }, + } +} + +func observePodDeletion(t *testing.T, w watch.Interface) (deletedPod *api.Pod) { + deleted := false + timeout := false + timer := time.After(60 * time.Second) + for !deleted && !timeout { + select { + case event, _ := <-w.ResultChan(): + if event.Type == watch.Deleted { + // TODO: used the commented code once we fix the client. + // deletedPod = event.Object.(*v1.Pod) + deletedPod = event.Object.(*api.Pod) + deleted = true + } + case <-timer: + timeout = true + } + } + if !deleted { + t.Fatalf("Failed to observe pod deletion") + } + return +} + +func setup(t *testing.T) (*garbagecollector.GarbageCollector, clientset.Interface) { + var m *master.Master + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + m.Handler.ServeHTTP(w, req) + })) + // TODO: close the http server + + masterConfig := framework.NewIntegrationTestMasterConfig() + masterConfig.EnableCoreControllers = false + m, err := master.New(masterConfig) + if err != nil { + t.Fatalf("Error in bringing up the master: %v", err) + } + + framework.DeleteAllEtcdKeys() + clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL}) + if err != nil { + t.Fatalf("Error in create clientset: %v", err) + } + groupVersionResources, err := clientSet.Discovery().ServerPreferredResources() + if err != nil { + t.Fatalf("Failed to get supported resources from server: %v", err) + } + clientPool := dynamic.NewClientPool(&restclient.Config{Host: s.URL}, dynamic.LegacyAPIPathResolverFunc) + gc, err := garbagecollector.NewGarbageCollector(clientPool, groupVersionResources) + if err != nil { + t.Fatalf("Failed to create garbage collector") + } + return gc, clientSet +} + +// This test simulates the cascading deletion. +func TestCascadingDeletion(t *testing.T) { + gc, clientSet := setup(t) + rcClient := clientSet.Core().ReplicationControllers(framework.TestNS) + podClient := clientSet.Core().Pods(framework.TestNS) + + toBeDeletedRC, err := rcClient.Create(newOwnerRC(toBeDeletedRCName)) + if err != nil { + t.Fatalf("Failed to create replication controller: %v", err) + } + remainingRC, err := rcClient.Create(newOwnerRC(remainingRCName)) + if err != nil { + t.Fatalf("Failed to create replication controller: %v", err) + } + + rcs, err := rcClient.List(api.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list replication controllers: %v", err) + } + if len(rcs.Items) != 2 { + t.Fatalf("Expect only 2 replication controller") + } + + // this pod should be cascadingly deleted. + pod := newPod(garbageCollectedPodName, []v1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: toBeDeletedRCName}}) + _, err = podClient.Create(pod) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + + // this pod shouldn't be cascadingly deleted, because it has a valid referenece. + pod = newPod(oneValidOwnerPodName, []v1.OwnerReference{ + {UID: toBeDeletedRC.ObjectMeta.UID, Name: toBeDeletedRCName}, + {UID: remainingRC.ObjectMeta.UID, Name: remainingRCName}, + }) + _, err = podClient.Create(pod) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + + // this pod shouldn't be cascadingly deleted, because it doesn't have an owner. + pod = newPod(independentPodName, []v1.OwnerReference{}) + _, err = podClient.Create(pod) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + + // set up watch + pods, err := podClient.List(api.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + if len(pods.Items) != 3 { + t.Fatalf("Expect only 3 pods") + } + options := api.ListOptions{ + ResourceVersion: pods.ListMeta.ResourceVersion, + } + w, err := podClient.Watch(options) + if err != nil { + t.Fatalf("Failed to set up watch: %v", err) + } + stopCh := make(chan struct{}) + go gc.Run(5, stopCh) + defer close(stopCh) + // delete one of the replication controller + if err := rcClient.Delete(toBeDeletedRCName, nil); err != nil { + t.Fatalf("failed to delete replication controller: %v", err) + } + + deletedPod := observePodDeletion(t, w) + if deletedPod == nil { + t.Fatalf("empty deletedPod") + } + if deletedPod.Name != garbageCollectedPodName { + t.Fatalf("deleted unexpected pod: %v", *deletedPod) + } + // wait for another 30 seconds to give garbage collect a chance to make mistakes. + time.Sleep(30 * time.Second) + // checks the garbage collect doesn't delete pods it shouldn't do. + if _, err := podClient.Get(independentPodName); err != nil { + t.Fatal(err) + } + if _, err := podClient.Get(oneValidOwnerPodName); err != nil { + t.Fatal(err) + } +} + +// This test simulates the case where an object is created with an owner that +// doesn't exist. It verifies the GC will delete such an object. +func TestCreateWithNonExisitentOwner(t *testing.T) { + gc, clientSet := setup(t) + podClient := clientSet.Core().Pods(framework.TestNS) + + pod := newPod(garbageCollectedPodName, []v1.OwnerReference{{UID: "doesn't matter", Name: toBeDeletedRCName}}) + _, err := podClient.Create(pod) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + + // set up watch + pods, err := podClient.List(api.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + if len(pods.Items) != 1 { + t.Fatalf("Expect only 1 pod") + } + options := api.ListOptions{ + ResourceVersion: pods.ListMeta.ResourceVersion, + } + w, err := podClient.Watch(options) + if err != nil { + t.Fatalf("Failed to set up watch: %v", err) + } + stopCh := make(chan struct{}) + go gc.Run(5, stopCh) + defer close(stopCh) + deletedPod := observePodDeletion(t, w) + if deletedPod == nil { + t.Fatalf("empty deletedPod") + } + if deletedPod.Name != garbageCollectedPodName { + t.Fatalf("deleted unexpected pod: %v", *deletedPod) + } +} + +func createRemoveRCsPods(t *testing.T, clientSet clientset.Interface, id int, wg *sync.WaitGroup, rcUIDs chan types.UID) { + defer wg.Done() + rcClient := clientSet.Core().ReplicationControllers(framework.TestNS) + podClient := clientSet.Core().Pods(framework.TestNS) + // create rc. + rcName := toBeDeletedRCName + strconv.Itoa(id) + toBeDeletedRC, err := rcClient.Create(newOwnerRC(rcName)) + if err != nil { + t.Fatalf("Failed to create replication controller: %v", err) + } + rcUIDs <- toBeDeletedRC.ObjectMeta.UID + // create pods. These pods should be cascadingly deleted. + for j := 0; j < 3; j++ { + podName := garbageCollectedPodName + strconv.Itoa(id) + "-" + strconv.Itoa(j) + pod := newPod(podName, []v1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: rcName}}) + _, err = podClient.Create(pod) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + } + // delete the rc + if err := rcClient.Delete(rcName, nil); err != nil { + t.Fatalf("failed to delete replication controller: %v", err) + } +} + +func allObjectsRemoved(clientSet clientset.Interface) (bool, error) { + rcClient := clientSet.Core().ReplicationControllers(framework.TestNS) + podClient := clientSet.Core().Pods(framework.TestNS) + pods, err := podClient.List(api.ListOptions{}) + if err != nil { + return false, fmt.Errorf("Failed to list pods: %v", err) + } + if len(pods.Items) != 0 { + return false, nil + } + rcs, err := rcClient.List(api.ListOptions{}) + if err != nil { + return false, fmt.Errorf("Failed to list replication controllers: %v", err) + } + if len(rcs.Items) != 0 { + return false, nil + } + return true, nil +} + +// This stress test the garbage collector +func TestStressingCascadingDeletion(t *testing.T) { + t.Logf("starts garbage collector stress test") + gc, clientSet := setup(t) + stopCh := make(chan struct{}) + go gc.Run(5, stopCh) + defer close(stopCh) + + const collections = 50 + var wg sync.WaitGroup + wg.Add(collections) + rcUIDs := make(chan types.UID, collections) + for i := 0; i < collections; i++ { + go createRemoveRCsPods(t, clientSet, i, &wg, rcUIDs) + } + wg.Wait() + t.Logf("all pods are created, all replications controllers are created then deleted") + // wait for the garbage collector to drain its queue + if err := wait.Poll(10*time.Second, 300*time.Second, func() (bool, error) { + return gc.QueuesDrained(), nil + }); err != nil { + t.Fatal(err) + } + t.Logf("garbage collector queues drained") + // wait for all replication controllers and pods to be deleted. This + // shouldn't take long, because the queues are already drained. + if err := wait.Poll(5*time.Second, 30*time.Second, func() (bool, error) { + return allObjectsRemoved(clientSet) + }); err != nil { + t.Fatal(err) + } + t.Logf("all replication controllers and pods are deleted") + + // verify there is no node representing replication controllers in the gc's graph + uids := make([]types.UID, 0, collections) + for i := 0; i < collections; i++ { + uid := <-rcUIDs + uids = append(uids, uid) + } + if gc.GraphHasUID(uids) { + t.Errorf("Expect all nodes representing replication controllers are removed from the Propagator's graph") + } +}