From 29d72a71345d535e270cee59e31e8b5788910bb9 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Tue, 28 Nov 2017 17:49:24 -0800 Subject: [PATCH] GC fallback to jsonmerge patch when SMP is not supported --- .../garbagecollector/garbagecollector.go | 17 ++- .../garbagecollector/garbagecollector_test.go | 4 +- .../garbagecollector/graph_builder.go | 15 +-- pkg/controller/garbagecollector/operations.go | 4 +- pkg/controller/garbagecollector/patch.go | 117 +++++++++++++++++- test/e2e/apimachinery/garbage_collector.go | 103 +++++++++++++++ 6 files changed, 240 insertions(+), 20 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 7f93386a86d..e0979bcc1b7 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -417,8 +417,11 @@ func (gc *GarbageCollector) attemptToDeleteItem(item *node) error { // waitingForDependentsDeletion needs to be deleted from the // ownerReferences, otherwise the referenced objects will be stuck with // the FinalizerDeletingDependents and never get deleted. - patch := deleteOwnerRefPatch(item.identity.UID, append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...)...) - _, err = gc.patchObject(item.identity, patch) + ownerUIDs := append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...) + patch := deleteOwnerRefStrategicMergePatch(item.identity.UID, ownerUIDs...) + _, err = gc.patch(item, patch, func(n *node) ([]byte, error) { + return gc.deleteOwnerRefJSONMergePatch(n, ownerUIDs...) + }) return err case len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0: deps := item.getDependents() @@ -430,11 +433,11 @@ func (gc *GarbageCollector) attemptToDeleteItem(item *node) error { // there are multiple workers run attemptToDeleteItem in // parallel, the circle detection can fail in a race condition. glog.V(2).Infof("processing object %s, some of its owners and its dependent [%s] have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the object is going to be deleted with Foreground", item.identity, dep.identity) - patch, err := item.patchToUnblockOwnerReferences() + patch, err := item.unblockOwnerReferencesStrategicMergePatch() if err != nil { return err } - if _, err := gc.patchObject(item.identity, patch); err != nil { + if _, err := gc.patch(item, patch, gc.unblockOwnerReferencesJSONMergePatch); err != nil { return err } break @@ -494,8 +497,10 @@ func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents [ go func(dependent *node) { defer wg.Done() // the dependent.identity.UID is used as precondition - patch := deleteOwnerRefPatch(dependent.identity.UID, owner.UID) - _, err := gc.patchObject(dependent.identity, patch) + patch := deleteOwnerRefStrategicMergePatch(dependent.identity.UID, owner.UID) + _, err := gc.patch(dependent, patch, func(n *node) ([]byte, error) { + return gc.deleteOwnerRefJSONMergePatch(n, owner.UID) + }) // note that if the target ownerReference doesn't exist in the // dependent, strategic merge patch will NOT return an error. if err != nil && !errors.IsNotFound(err) { diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 96682e4c6da..8d34b6a4246 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -593,7 +593,7 @@ func TestDeleteOwnerRefPatch(t *testing.T) { }, }, } - patch := deleteOwnerRefPatch("100", "2", "3") + patch := deleteOwnerRefStrategicMergePatch("100", "2", "3") patched, err := strategicpatch.StrategicMergePatch(originalData, patch, v1.Pod{}) if err != nil { t.Fatal(err) @@ -638,7 +638,7 @@ func TestUnblockOwnerReference(t *testing.T) { n := node{ owners: accessor.GetOwnerReferences(), } - patch, err := n.patchToUnblockOwnerReferences() + patch, err := n.unblockOwnerReferencesStrategicMergePatch() if err != nil { t.Fatal(err) } diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index 4e18b1c2678..8068e97775d 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -78,7 +78,7 @@ type GraphBuilder struct { // each monitor list/watches a resource, the results are funneled to the // dependencyGraphBuilder monitors monitors - monitorLock sync.Mutex + monitorLock sync.RWMutex // informersStarted is closed after after all of the controllers have been initialized and are running. // After that it is safe to start them here, before that it is not. informersStarted <-chan struct{} @@ -111,6 +111,7 @@ type GraphBuilder struct { // monitor runs a Controller with a local stop channel. type monitor struct { controller cache.Controller + store cache.Store // stopCh stops Controller. If stopCh is nil, the monitor is considered to be // not yet started. @@ -138,7 +139,7 @@ func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource) } } -func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, error) { +func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) { handlers := cache.ResourceEventHandlerFuncs{ // add the event to the dependencyGraphBuilder's graphChanges. AddFunc: func(obj interface{}) { @@ -178,21 +179,21 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String()) // need to clone because it's from a shared cache shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime) - return shared.Informer().GetController(), nil + return shared.Informer().GetController(), shared.Informer().GetStore(), nil } else { glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err) } // TODO: consider store in one storage. glog.V(5).Infof("create storage for resource %s", resource) - _, monitor := cache.NewInformer( + store, monitor := cache.NewInformer( listWatcher(gb.dynamicClient, resource), nil, ResourceResyncTime, // don't need to clone because it's not from shared cache handlers, ) - return monitor, nil + return monitor, store, nil } // syncMonitors rebuilds the monitor set according to the supplied resources, @@ -228,12 +229,12 @@ func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]s errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err)) continue } - c, err := gb.controllerFor(resource, kind) + c, s, err := gb.controllerFor(resource, kind) if err != nil { errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err)) continue } - current[resource] = &monitor{controller: c} + current[resource] = &monitor{store: s, controller: c} added++ } gb.monitors = current diff --git a/pkg/controller/garbagecollector/operations.go b/pkg/controller/garbagecollector/operations.go index 29025692b89..9f0145869c1 100644 --- a/pkg/controller/garbagecollector/operations.go +++ b/pkg/controller/garbagecollector/operations.go @@ -76,12 +76,12 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Update(obj) } -func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) { +func (gc *GarbageCollector) patchObject(item objectReference, patch []byte, pt types.PatchType) (*unstructured.Unstructured, error) { resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return nil, err } - return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch) + return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(item.Name, pt, patch) } // TODO: Using Patch when strategicmerge supports deleting an entry from a diff --git a/pkg/controller/garbagecollector/patch.go b/pkg/controller/garbagecollector/patch.go index 8f8f9fb75ab..b0169adeaab 100644 --- a/pkg/controller/garbagecollector/patch.go +++ b/pkg/controller/garbagecollector/patch.go @@ -21,12 +21,16 @@ import ( "fmt" "strings" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" ) -func deleteOwnerRefPatch(dependentUID types.UID, ownerUIDs ...types.UID) []byte { +func deleteOwnerRefStrategicMergePatch(dependentUID types.UID, ownerUIDs ...types.UID) []byte { var pieces []string for _, ownerUID := range ownerUIDs { pieces = append(pieces, fmt.Sprintf(`{"$patch":"delete","uid":"%s"}`, ownerUID)) @@ -35,9 +39,97 @@ func deleteOwnerRefPatch(dependentUID types.UID, ownerUIDs ...types.UID) []byte return []byte(patch) } -// generate a patch that unsets the BlockOwnerDeletion field of all +// getMetadata tries getting object metadata from local cache, and sends GET request to apiserver when +// local cache is not available or not latest. +func (gc *GarbageCollector) getMetadata(apiVersion, kind, namespace, name string) (metav1.Object, error) { + apiResource, _, err := gc.apiResource(apiVersion, kind) + if err != nil { + return nil, err + } + gc.dependencyGraphBuilder.monitorLock.RLock() + defer gc.dependencyGraphBuilder.monitorLock.RUnlock() + m, ok := gc.dependencyGraphBuilder.monitors[apiResource] + if !ok || m == nil { + // If local cache doesn't exist for mapping.Resource, send a GET request to API server + return gc.dynamicClient.Resource(apiResource).Namespace(namespace).Get(name, metav1.GetOptions{}) + } + key := name + if len(namespace) != 0 { + key = namespace + "/" + name + } + raw, exist, err := m.store.GetByKey(key) + if err != nil { + return nil, err + } + if !exist { + // If local cache doesn't contain the object, send a GET request to API server + return gc.dynamicClient.Resource(apiResource).Namespace(namespace).Get(name, metav1.GetOptions{}) + } + obj, ok := raw.(runtime.Object) + if !ok { + return nil, fmt.Errorf("expect a runtime.Object, got %v", raw) + } + return meta.Accessor(obj) +} + +type objectForPatch struct { + ObjectMetaForPatch `json:"metadata"` +} + +type ObjectMetaForPatch struct { + ResourceVersion string `json:"resourceVersion"` + OwnerReferences []metav1.OwnerReference `json:"ownerReferences"` +} + +// jsonMergePatchFunc defines the interface for functions that construct json merge patches that manipulate +// owner reference array. +type jsonMergePatchFunc func(*node) ([]byte, error) + +// patch tries strategic merge patch on item first, and if SMP is not supported, it fallbacks to JSON merge +// patch. +func (gc *GarbageCollector) patch(item *node, smp []byte, jmp jsonMergePatchFunc) (*unstructured.Unstructured, error) { + smpResult, err := gc.patchObject(item.identity, smp, types.StrategicMergePatchType) + if err == nil { + return smpResult, nil + } + if !errors.IsUnsupportedMediaType(err) { + return nil, err + } + // StrategicMergePatch is not supported, use JSON merge patch instead + patch, err := jmp(item) + if err != nil { + return nil, err + } + return gc.patchObject(item.identity, patch, types.MergePatchType) +} + +// Returns JSON merge patch that removes the ownerReferences matching ownerUIDs. +func (gc *GarbageCollector) deleteOwnerRefJSONMergePatch(item *node, ownerUIDs ...types.UID) ([]byte, error) { + accessor, err := gc.getMetadata(item.identity.APIVersion, item.identity.Kind, item.identity.Namespace, item.identity.Name) + if err != nil { + return nil, err + } + expectedObjectMeta := ObjectMetaForPatch{} + expectedObjectMeta.ResourceVersion = accessor.GetResourceVersion() + refs := accessor.GetOwnerReferences() + for _, ref := range refs { + var skip bool + for _, ownerUID := range ownerUIDs { + if ref.UID == ownerUID { + skip = true + break + } + } + if !skip { + expectedObjectMeta.OwnerReferences = append(expectedObjectMeta.OwnerReferences, ref) + } + } + return json.Marshal(objectForPatch{expectedObjectMeta}) +} + +// Generate a patch that unsets the BlockOwnerDeletion field of all // ownerReferences of node. -func (n *node) patchToUnblockOwnerReferences() ([]byte, error) { +func (n *node) unblockOwnerReferencesStrategicMergePatch() ([]byte, error) { var dummy metaonly.MetadataOnlyObject var blockingRefs []metav1.OwnerReference falseVar := false @@ -52,3 +144,22 @@ func (n *node) patchToUnblockOwnerReferences() ([]byte, error) { dummy.ObjectMeta.UID = n.identity.UID return json.Marshal(dummy) } + +// Generate a JSON merge patch that unsets the BlockOwnerDeletion field of all +// ownerReferences of node. +func (gc *GarbageCollector) unblockOwnerReferencesJSONMergePatch(n *node) ([]byte, error) { + accessor, err := gc.getMetadata(n.identity.APIVersion, n.identity.Kind, n.identity.Namespace, n.identity.Name) + if err != nil { + return nil, err + } + expectedObjectMeta := ObjectMetaForPatch{} + expectedObjectMeta.ResourceVersion = accessor.GetResourceVersion() + var expectedOwners []metav1.OwnerReference + falseVar := false + for _, owner := range n.owners { + owner.BlockOwnerDeletion = &falseVar + expectedOwners = append(expectedOwners, owner) + } + expectedObjectMeta.OwnerReferences = expectedOwners + return json.Marshal(objectForPatch{expectedObjectMeta}) +} diff --git a/test/e2e/apimachinery/garbage_collector.go b/test/e2e/apimachinery/garbage_collector.go index f9e3c74938c..8b00c824f47 100644 --- a/test/e2e/apimachinery/garbage_collector.go +++ b/test/e2e/apimachinery/garbage_collector.go @@ -998,6 +998,109 @@ var _ = SIGDescribe("Garbage collector", func() { } }) + It("should support orphan deletion of custom resources", func() { + config, err := framework.LoadConfig() + if err != nil { + framework.Failf("failed to load config: %v", err) + } + + apiExtensionClient, err := apiextensionsclientset.NewForConfig(config) + if err != nil { + framework.Failf("failed to initialize apiExtensionClient: %v", err) + } + + // Create a random custom resource definition and ensure it's available for + // use. + definition := apiextensionstestserver.NewRandomNameCustomResourceDefinition(apiextensionsv1beta1.ClusterScoped) + defer func() { + err = apiextensionstestserver.DeleteCustomResourceDefinition(definition, apiExtensionClient) + if err != nil && !errors.IsNotFound(err) { + framework.Failf("failed to delete CustomResourceDefinition: %v", err) + } + }() + definition, err = apiextensionstestserver.CreateNewCustomResourceDefinition(definition, apiExtensionClient, f.DynamicClient) + if err != nil { + framework.Failf("failed to create CustomResourceDefinition: %v", err) + } + + // Get a client for the custom resource. + gvr := schema.GroupVersionResource{Group: definition.Spec.Group, Version: definition.Spec.Version, Resource: definition.Spec.Names.Plural} + resourceClient := f.DynamicClient.Resource(gvr) + + apiVersion := definition.Spec.Group + "/" + definition.Spec.Version + + // Create a custom owner resource. + ownerName := names.SimpleNameGenerator.GenerateName("owner") + owner := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": definition.Spec.Names.Kind, + "metadata": map[string]interface{}{ + "name": ownerName, + }, + }, + } + persistedOwner, err := resourceClient.Create(owner) + if err != nil { + framework.Failf("failed to create owner resource %q: %v", ownerName, err) + } + framework.Logf("created owner resource %q", ownerName) + + // Create a custom dependent resource. + dependentName := names.SimpleNameGenerator.GenerateName("dependent") + dependent := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": definition.Spec.Names.Kind, + "metadata": map[string]interface{}{ + "name": dependentName, + "ownerReferences": []map[string]string{ + { + "uid": string(persistedOwner.GetUID()), + "apiVersion": apiVersion, + "kind": definition.Spec.Names.Kind, + "name": ownerName, + }, + }, + }, + }, + } + _, err = resourceClient.Create(dependent) + if err != nil { + framework.Failf("failed to create dependent resource %q: %v", dependentName, err) + } + framework.Logf("created dependent resource %q", dependentName) + + // Delete the owner and orphan the dependent. + err = resourceClient.Delete(ownerName, getOrphanOptions()) + if err != nil { + framework.Failf("failed to delete owner resource %q: %v", ownerName, err) + } + + By("wait for the owner to be deleted") + if err := wait.Poll(5*time.Second, 120*time.Second, func() (bool, error) { + _, err = resourceClient.Get(ownerName, metav1.GetOptions{}) + if err == nil { + return false, nil + } + if err != nil && !errors.IsNotFound(err) { + return false, fmt.Errorf("Failed to get owner: %v", err) + } + return true, nil + }); err != nil { + framework.Failf("timeout in waiting for the owner to be deleted: %v", err) + } + + // Wait 30s and ensure the dependent is not deleted. + By("wait for 30 seconds to see if the garbage collector mistakenly deletes the dependent crd") + if err := wait.Poll(5*time.Second, 30*time.Second, func() (bool, error) { + _, err := resourceClient.Get(dependentName, metav1.GetOptions{}) + return false, err + }); err != nil && err != wait.ErrWaitTimeout { + framework.Failf("failed to ensure the dependent is not deleted: %v", err) + } + }) + It("should delete jobs and pods created by cronjob", func() { framework.SkipIfMissingResource(f.DynamicClient, CronJobGroupVersionResource, f.Namespace.Name)