From 2103ea4ddedc59d14322fd0fc34c8a9928256551 Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Mon, 16 Dec 2019 13:00:33 -0800 Subject: [PATCH 1/7] Changes Visit() to Infos() in apply to keep slice of objects --- .../src/k8s.io/kubectl/pkg/cmd/apply/apply.go | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go index a1d554cad9c..b86b88de35c 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go @@ -374,10 +374,13 @@ func (o *ApplyOptions) Run() error { var objs []runtime.Object count := 0 - err = r.Visit(func(info *resource.Info, err error) error { - if err != nil { - return err - } + + infos, err := r.Infos() + if err != nil { + return err + } + + for _, info := range infos { // If server-dry-run is requested but the type doesn't support it, fail right away. if o.ServerDryRun { @@ -449,7 +452,7 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) count++ if len(output) > 0 && !shortOutput { objs = append(objs, info.Object) - return nil + continue } printer, err := o.ToPrinter("serverside-applied") @@ -457,7 +460,10 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) return err } - return printer.PrintObj(info.Object, o.Out) + if err = printer.PrintObj(info.Object, o.Out); err != nil { + return err + } + continue } // Get the modified configuration of the object. Embed the result @@ -505,14 +511,17 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) if printObject { objs = append(objs, info.Object) - return nil + continue } printer, err := o.ToPrinter("created") if err != nil { return err } - return printer.PrintObj(info.Object, o.Out) + if err = printer.PrintObj(info.Object, o.Out); err != nil { + return err + } + continue } metadata, err := meta.Accessor(info.Object) @@ -557,24 +566,26 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) if err != nil { return err } - return printer.PrintObj(info.Object, o.Out) + if err = printer.PrintObj(info.Object, o.Out); err != nil { + return err + } + continue } } count++ if printObject { objs = append(objs, info.Object) - return nil + continue } printer, err := o.ToPrinter("configured") if err != nil { return err } - return printer.PrintObj(info.Object, o.Out) - }) - if err != nil { - return err + if err = printer.PrintObj(info.Object, o.Out); err != nil { + return err + } } if count == 0 { From 02af4c9be20db201f2a2069816e97004a96bf760 Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Mon, 16 Dec 2019 13:56:52 -0800 Subject: [PATCH 2/7] Created GetObjects() method for ApplyOptions and integrated into apply --- .../src/k8s.io/kubectl/pkg/cmd/apply/apply.go | 62 ++++++++++++++----- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go index b86b88de35c..20efea55288 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go @@ -92,6 +92,16 @@ type ApplyOptions struct { EnforceNamespace bool genericclioptions.IOStreams + + // Objects (and some denormalized data) which are to be + // applied. The standard way to fill in this structure + // is by calling "GetObjects()", which will use the + // resource builder if "objectsCached" is false. The other + // way to set this field is to use "SetObjects()". + // Subsequent calls to "GetObjects()" after setting would + // not call the resource builder; only return the set objects. + objects []*resource.Info + objectsCached bool } const ( @@ -146,6 +156,9 @@ func NewApplyOptions(ioStreams genericclioptions.IOStreams) *ApplyOptions { Recorder: genericclioptions.NoopRecorder{}, IOStreams: ioStreams, + + objects: []*resource.Info{}, + objectsCached: false, } } @@ -330,6 +343,37 @@ func isIncompatibleServerError(err error) bool { return err.(*errors.StatusError).Status().Code == http.StatusUnsupportedMediaType } +// GetObjects returns a (possibly cached) version of all the objects to apply +// as a slice of pointer to resource.Info. The resource.Info contains the object +// and some other denormalized data. This function should not be called until +// AFTER the "complete" and "validate" methods have been called to ensure that +// the ApplyOptions is filled in and valid. +func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) { + if !o.objectsCached { + // include the uninitialized objects by default if --prune is true + // unless explicitly set --include-uninitialized=false + r := o.Builder. + Unstructured(). + Schema(o.Validator). + ContinueOnError(). + NamespaceParam(o.Namespace).DefaultNamespace(). + FilenameParam(o.EnforceNamespace, &o.DeleteOptions.FilenameOptions). + LabelSelectorParam(o.Selector). + Flatten(). + Do() + if err := r.Err(); err != nil { + return nil, err + } + infos, err := r.Infos() + if err != nil { + return nil, err + } + o.objects = infos + o.objectsCached = true + } + return o.objects, nil +} + // Run executes the `apply` command. func (o *ApplyOptions) Run() error { var openapiSchema openapi.Resources @@ -342,21 +386,6 @@ func (o *ApplyOptions) Run() error { OpenAPIGetter: o.DiscoveryClient, } - // include the uninitialized objects by default if --prune is true - // unless explicitly set --include-uninitialized=false - r := o.Builder. - Unstructured(). - Schema(o.Validator). - ContinueOnError(). - NamespaceParam(o.Namespace).DefaultNamespace(). - FilenameParam(o.EnforceNamespace, &o.DeleteOptions.FilenameOptions). - LabelSelectorParam(o.Selector). - Flatten(). - Do() - if err := r.Err(); err != nil { - return err - } - var err error if o.Prune { o.PruneResources, err = parsePruneResources(o.Mapper, o.PruneWhitelist) @@ -375,11 +404,10 @@ func (o *ApplyOptions) Run() error { count := 0 - infos, err := r.Infos() + infos, err := o.GetObjects() if err != nil { return err } - for _, info := range infos { // If server-dry-run is requested but the type doesn't support it, fail right away. From 07cb2fda5dada5c5145216bcab90e38426cf68e8 Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Mon, 16 Dec 2019 15:21:57 -0800 Subject: [PATCH 3/7] Refactored some apply printing functionality; removed unneeded count and objs variables --- .../src/k8s.io/kubectl/pkg/cmd/apply/apply.go | 120 ++++++++++-------- 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go index 20efea55288..54de97ade3a 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go @@ -347,7 +347,8 @@ func isIncompatibleServerError(err error) bool { // as a slice of pointer to resource.Info. The resource.Info contains the object // and some other denormalized data. This function should not be called until // AFTER the "complete" and "validate" methods have been called to ensure that -// the ApplyOptions is filled in and valid. +// the ApplyOptions is filled in and valid. Returns an error if the resource +// builder returns an error retrieving the objects. func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) { if !o.objectsCached { // include the uninitialized objects by default if --prune is true @@ -394,20 +395,17 @@ func (o *ApplyOptions) Run() error { } } - output := *o.PrintFlags.OutputFormat - shortOutput := output == "name" - visitedUids := sets.NewString() visitedNamespaces := sets.NewString() - var objs []runtime.Object - - count := 0 - infos, err := o.GetObjects() if err != nil { return err } + if len(infos) == 0 { + return fmt.Errorf("no objects passed to apply") + } + for _, info := range infos { // If server-dry-run is requested but the type doesn't support it, fail right away. @@ -477,9 +475,7 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) } visitedUids.Insert(string(metadata.GetUID())) - count++ - if len(output) > 0 && !shortOutput { - objs = append(objs, info.Object) + if o.shouldPrintObject() { continue } @@ -502,9 +498,6 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving modified configuration from:\n%s\nfor:", info.String()), info.Source, err) } - // Print object only if output format other than "name" is specified - printObject := len(output) > 0 && !shortOutput - if err := info.Get(); err != nil { if !errors.IsNotFound(err) { return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving current configuration of:\n%s\nfrom server for:", info.String()), info.Source, err) @@ -535,10 +528,7 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) } visitedUids.Insert(string(metadata.GetUID())) - count++ - - if printObject { - objs = append(objs, info.Object) + if o.shouldPrintObject() { continue } @@ -587,9 +577,7 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) info.Refresh(patchedObject, true) - if string(patchBytes) == "{}" && !printObject { - count++ - + if string(patchBytes) == "{}" && !o.shouldPrintObject() { printer, err := o.ToPrinter("unchanged") if err != nil { return err @@ -600,10 +588,8 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) continue } } - count++ - if printObject { - objs = append(objs, info.Object) + if o.shouldPrintObject() { continue } @@ -616,35 +602,8 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) } } - if count == 0 { - return fmt.Errorf("no objects passed to apply") - } - - // print objects - if len(objs) > 0 { - printer, err := o.ToPrinter("") - if err != nil { - return err - } - - objToPrint := objs[0] - if len(objs) > 1 { - list := &corev1.List{ - TypeMeta: metav1.TypeMeta{ - Kind: "List", - APIVersion: "v1", - }, - ListMeta: metav1.ListMeta{}, - } - if err := meta.SetList(list, objs); err != nil { - return err - } - - objToPrint = list - } - if err := printer.PrintObj(objToPrint, o.Out); err != nil { - return err - } + if err := o.printObjects(); err != nil { + return err } if !o.Prune { @@ -692,6 +651,61 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) return nil } +func (o *ApplyOptions) shouldPrintObject() bool { + // Print object only if output format other than "name" is specified + shouldPrint := false + output := *o.PrintFlags.OutputFormat + shortOutput := output == "name" + if len(output) > 0 && !shortOutput { + shouldPrint = true + } + return shouldPrint +} + +func (o *ApplyOptions) printObjects() error { + + if !o.shouldPrintObject() { + return nil + } + + infos, err := o.GetObjects() + if err != nil { + return err + } + + if len(infos) > 0 { + printer, err := o.ToPrinter("") + if err != nil { + return err + } + + objToPrint := infos[0].Object + if len(infos) > 1 { + objs := []runtime.Object{} + for _, info := range infos { + objs = append(objs, info.Object) + } + list := &corev1.List{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{}, + } + if err := meta.SetList(list, objs); err != nil { + return err + } + + objToPrint = list + } + if err := printer.PrintObj(objToPrint, o.Out); err != nil { + return err + } + } + + return nil +} + type pruneResource struct { group string version string From e38e320222a1f44373ab3eaea8713f4772b26f6b Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Mon, 16 Dec 2019 16:13:39 -0800 Subject: [PATCH 4/7] Moved prune functionality into its own file. --- .../src/k8s.io/kubectl/pkg/cmd/apply/BUILD | 1 + .../src/k8s.io/kubectl/pkg/cmd/apply/apply.go | 224 +--------------- .../src/k8s.io/kubectl/pkg/cmd/apply/prune.go | 243 ++++++++++++++++++ 3 files changed, 254 insertions(+), 214 deletions(-) create mode 100644 staging/src/k8s.io/kubectl/pkg/cmd/apply/prune.go diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD b/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD index 6fcd40be5f6..03b487639b1 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD @@ -7,6 +7,7 @@ go_library( "apply_edit_last_applied.go", "apply_set_last_applied.go", "apply_view_last_applied.go", + "prune.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/kubectl/pkg/cmd/apply", importpath = "k8s.io/kubectl/pkg/cmd/apply", diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go index 54de97ade3a..9fb43eb0c1c 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "net/http" - "strings" "time" "github.com/jonboulle/clockwork" @@ -282,6 +281,13 @@ func (o *ApplyOptions) Complete(f cmdutil.Factory, cmd *cobra.Command) error { return err } + if o.Prune { + o.PruneResources, err = parsePruneResources(o.Mapper, o.PruneWhitelist) + if err != nil { + return err + } + } + return nil } @@ -302,37 +308,6 @@ func validatePruneAll(prune, all bool, selector string) error { return nil } -func parsePruneResources(mapper meta.RESTMapper, gvks []string) ([]pruneResource, error) { - pruneResources := []pruneResource{} - for _, groupVersionKind := range gvks { - gvk := strings.Split(groupVersionKind, "/") - if len(gvk) != 3 { - return nil, fmt.Errorf("invalid GroupVersionKind format: %v, please follow ", groupVersionKind) - } - - if gvk[0] == "core" { - gvk[0] = "" - } - mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk[0], Kind: gvk[2]}, gvk[1]) - if err != nil { - return pruneResources, err - } - var namespaced bool - namespaceScope := mapping.Scope.Name() - switch namespaceScope { - case meta.RESTScopeNameNamespace: - namespaced = true - case meta.RESTScopeNameRoot: - namespaced = false - default: - return pruneResources, fmt.Errorf("Unknown namespace scope: %q", namespaceScope) - } - - pruneResources = append(pruneResources, pruneResource{gvk[0], gvk[1], gvk[2], namespaced}) - } - return pruneResources, nil -} - func isIncompatibleServerError(err error) bool { // 415: Unsupported media type means we're talking to a server which doesn't // support server-side apply. @@ -387,14 +362,6 @@ func (o *ApplyOptions) Run() error { OpenAPIGetter: o.DiscoveryClient, } - var err error - if o.Prune { - o.PruneResources, err = parsePruneResources(o.Mapper, o.PruneWhitelist) - if err != nil { - return err - } - } - visitedUids := sets.NewString() visitedNamespaces := sets.NewString() @@ -606,46 +573,9 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) return err } - if !o.Prune { - return nil - } - - p := pruner{ - mapper: o.Mapper, - dynamicClient: o.DynamicClient, - - labelSelector: o.Selector, - visitedUids: visitedUids, - - cascade: o.DeleteOptions.Cascade, - dryRun: o.DryRun, - serverDryRun: o.ServerDryRun, - gracePeriod: o.DeleteOptions.GracePeriod, - - toPrinter: o.ToPrinter, - - out: o.Out, - } - - namespacedRESTMappings, nonNamespacedRESTMappings, err := getRESTMappings(o.Mapper, &(o.PruneResources)) - if err != nil { - return fmt.Errorf("error retrieving RESTMappings to prune: %v", err) - } - - for n := range visitedNamespaces { - if len(o.Namespace) != 0 && n != o.Namespace { - continue - } - for _, m := range namespacedRESTMappings { - if err := p.prune(n, m); err != nil { - return fmt.Errorf("error pruning namespaced object %v: %v", m.GroupVersionKind, err) - } - } - } - for _, m := range nonNamespacedRESTMappings { - if err := p.prune(metav1.NamespaceNone, m); err != nil { - return fmt.Errorf("error pruning nonNamespaced object %v: %v", m.GroupVersionKind, err) - } + if o.Prune { + p := newPruner(o, visitedUids, visitedNamespaces) + return p.pruneAll(o) } return nil @@ -706,140 +636,6 @@ func (o *ApplyOptions) printObjects() error { return nil } -type pruneResource struct { - group string - version string - kind string - namespaced bool -} - -func (pr pruneResource) String() string { - return fmt.Sprintf("%v/%v, Kind=%v, Namespaced=%v", pr.group, pr.version, pr.kind, pr.namespaced) -} - -func getRESTMappings(mapper meta.RESTMapper, pruneResources *[]pruneResource) (namespaced, nonNamespaced []*meta.RESTMapping, err error) { - if len(*pruneResources) == 0 { - // default whitelist - // TODO: need to handle the older api versions - e.g. v1beta1 jobs. Github issue: #35991 - *pruneResources = []pruneResource{ - {"", "v1", "ConfigMap", true}, - {"", "v1", "Endpoints", true}, - {"", "v1", "Namespace", false}, - {"", "v1", "PersistentVolumeClaim", true}, - {"", "v1", "PersistentVolume", false}, - {"", "v1", "Pod", true}, - {"", "v1", "ReplicationController", true}, - {"", "v1", "Secret", true}, - {"", "v1", "Service", true}, - {"batch", "v1", "Job", true}, - {"batch", "v1beta1", "CronJob", true}, - {"extensions", "v1beta1", "Ingress", true}, - {"apps", "v1", "DaemonSet", true}, - {"apps", "v1", "Deployment", true}, - {"apps", "v1", "ReplicaSet", true}, - {"apps", "v1", "StatefulSet", true}, - } - } - - for _, resource := range *pruneResources { - addedMapping, err := mapper.RESTMapping(schema.GroupKind{Group: resource.group, Kind: resource.kind}, resource.version) - if err != nil { - return nil, nil, fmt.Errorf("invalid resource %v: %v", resource, err) - } - if resource.namespaced { - namespaced = append(namespaced, addedMapping) - } else { - nonNamespaced = append(nonNamespaced, addedMapping) - } - } - - return namespaced, nonNamespaced, nil -} - -type pruner struct { - mapper meta.RESTMapper - dynamicClient dynamic.Interface - - visitedUids sets.String - labelSelector string - fieldSelector string - - cascade bool - serverDryRun bool - dryRun bool - gracePeriod int - - toPrinter func(string) (printers.ResourcePrinter, error) - - out io.Writer -} - -func (p *pruner) prune(namespace string, mapping *meta.RESTMapping) error { - objList, err := p.dynamicClient.Resource(mapping.Resource). - Namespace(namespace). - List(metav1.ListOptions{ - LabelSelector: p.labelSelector, - FieldSelector: p.fieldSelector, - }) - if err != nil { - return err - } - - objs, err := meta.ExtractList(objList) - if err != nil { - return err - } - - for _, obj := range objs { - metadata, err := meta.Accessor(obj) - if err != nil { - return err - } - annots := metadata.GetAnnotations() - if _, ok := annots[corev1.LastAppliedConfigAnnotation]; !ok { - // don't prune resources not created with apply - continue - } - uid := metadata.GetUID() - if p.visitedUids.Has(string(uid)) { - continue - } - name := metadata.GetName() - if !p.dryRun { - if err := p.delete(namespace, name, mapping); err != nil { - return err - } - } - - printer, err := p.toPrinter("pruned") - if err != nil { - return err - } - printer.PrintObj(obj, p.out) - } - return nil -} - -func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping) error { - return runDelete(namespace, name, mapping, p.dynamicClient, p.cascade, p.gracePeriod, p.serverDryRun) -} - -func runDelete(namespace, name string, mapping *meta.RESTMapping, c dynamic.Interface, cascade bool, gracePeriod int, serverDryRun bool) error { - options := &metav1.DeleteOptions{} - if gracePeriod >= 0 { - options = metav1.NewDeleteOptions(int64(gracePeriod)) - } - if serverDryRun { - options.DryRun = []string{metav1.DryRunAll} - } - policy := metav1.DeletePropagationForeground - if !cascade { - policy = metav1.DeletePropagationOrphan - } - options.PropagationPolicy = &policy - return c.Resource(mapping.Resource).Namespace(namespace).Delete(name, options) -} - func (p *Patcher) delete(namespace, name string) error { return runDelete(namespace, name, p.Mapping, p.DynamicClient, p.Cascade, p.GracePeriod, p.ServerDryRun) } diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/prune.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/prune.go new file mode 100644 index 00000000000..3987aa48628 --- /dev/null +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/prune.go @@ -0,0 +1,243 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apply + +import ( + "fmt" + "io" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/cli-runtime/pkg/printers" + "k8s.io/client-go/dynamic" +) + +type pruner struct { + mapper meta.RESTMapper + dynamicClient dynamic.Interface + + visitedUids sets.String + visitedNamespaces sets.String + labelSelector string + fieldSelector string + + cascade bool + serverDryRun bool + dryRun bool + gracePeriod int + + toPrinter func(string) (printers.ResourcePrinter, error) + + out io.Writer +} + +func newPruner(o *ApplyOptions, visitedUids sets.String, visitedNamespaces sets.String) pruner { + return pruner{ + mapper: o.Mapper, + dynamicClient: o.DynamicClient, + + labelSelector: o.Selector, + visitedUids: visitedUids, + visitedNamespaces: visitedNamespaces, + + cascade: o.DeleteOptions.Cascade, + dryRun: o.DryRun, + serverDryRun: o.ServerDryRun, + gracePeriod: o.DeleteOptions.GracePeriod, + + toPrinter: o.ToPrinter, + + out: o.Out, + } +} + +func (p *pruner) pruneAll(o *ApplyOptions) error { + + namespacedRESTMappings, nonNamespacedRESTMappings, err := getRESTMappings(o.Mapper, &(o.PruneResources)) + if err != nil { + return fmt.Errorf("error retrieving RESTMappings to prune: %v", err) + } + + for n := range p.visitedNamespaces { + if len(o.Namespace) != 0 && n != o.Namespace { + continue + } + for _, m := range namespacedRESTMappings { + if err := p.prune(n, m); err != nil { + return fmt.Errorf("error pruning namespaced object %v: %v", m.GroupVersionKind, err) + } + } + } + for _, m := range nonNamespacedRESTMappings { + if err := p.prune(metav1.NamespaceNone, m); err != nil { + return fmt.Errorf("error pruning nonNamespaced object %v: %v", m.GroupVersionKind, err) + } + } + + return nil +} + +func (p *pruner) prune(namespace string, mapping *meta.RESTMapping) error { + objList, err := p.dynamicClient.Resource(mapping.Resource). + Namespace(namespace). + List(metav1.ListOptions{ + LabelSelector: p.labelSelector, + FieldSelector: p.fieldSelector, + }) + if err != nil { + return err + } + + objs, err := meta.ExtractList(objList) + if err != nil { + return err + } + + for _, obj := range objs { + metadata, err := meta.Accessor(obj) + if err != nil { + return err + } + annots := metadata.GetAnnotations() + if _, ok := annots[corev1.LastAppliedConfigAnnotation]; !ok { + // don't prune resources not created with apply + continue + } + uid := metadata.GetUID() + if p.visitedUids.Has(string(uid)) { + continue + } + name := metadata.GetName() + if !p.dryRun { + if err := p.delete(namespace, name, mapping); err != nil { + return err + } + } + + printer, err := p.toPrinter("pruned") + if err != nil { + return err + } + printer.PrintObj(obj, p.out) + } + return nil +} + +func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping) error { + return runDelete(namespace, name, mapping, p.dynamicClient, p.cascade, p.gracePeriod, p.serverDryRun) +} + +func runDelete(namespace, name string, mapping *meta.RESTMapping, c dynamic.Interface, cascade bool, gracePeriod int, serverDryRun bool) error { + options := &metav1.DeleteOptions{} + if gracePeriod >= 0 { + options = metav1.NewDeleteOptions(int64(gracePeriod)) + } + if serverDryRun { + options.DryRun = []string{metav1.DryRunAll} + } + policy := metav1.DeletePropagationForeground + if !cascade { + policy = metav1.DeletePropagationOrphan + } + options.PropagationPolicy = &policy + return c.Resource(mapping.Resource).Namespace(namespace).Delete(name, options) +} + +type pruneResource struct { + group string + version string + kind string + namespaced bool +} + +func (pr pruneResource) String() string { + return fmt.Sprintf("%v/%v, Kind=%v, Namespaced=%v", pr.group, pr.version, pr.kind, pr.namespaced) +} + +func getRESTMappings(mapper meta.RESTMapper, pruneResources *[]pruneResource) (namespaced, nonNamespaced []*meta.RESTMapping, err error) { + if len(*pruneResources) == 0 { + // default whitelist + // TODO: need to handle the older api versions - e.g. v1beta1 jobs. Github issue: #35991 + *pruneResources = []pruneResource{ + {"", "v1", "ConfigMap", true}, + {"", "v1", "Endpoints", true}, + {"", "v1", "Namespace", false}, + {"", "v1", "PersistentVolumeClaim", true}, + {"", "v1", "PersistentVolume", false}, + {"", "v1", "Pod", true}, + {"", "v1", "ReplicationController", true}, + {"", "v1", "Secret", true}, + {"", "v1", "Service", true}, + {"batch", "v1", "Job", true}, + {"batch", "v1beta1", "CronJob", true}, + {"extensions", "v1beta1", "Ingress", true}, + {"apps", "v1", "DaemonSet", true}, + {"apps", "v1", "Deployment", true}, + {"apps", "v1", "ReplicaSet", true}, + {"apps", "v1", "StatefulSet", true}, + } + } + + for _, resource := range *pruneResources { + addedMapping, err := mapper.RESTMapping(schema.GroupKind{Group: resource.group, Kind: resource.kind}, resource.version) + if err != nil { + return nil, nil, fmt.Errorf("invalid resource %v: %v", resource, err) + } + if resource.namespaced { + namespaced = append(namespaced, addedMapping) + } else { + nonNamespaced = append(nonNamespaced, addedMapping) + } + } + + return namespaced, nonNamespaced, nil +} + +func parsePruneResources(mapper meta.RESTMapper, gvks []string) ([]pruneResource, error) { + pruneResources := []pruneResource{} + for _, groupVersionKind := range gvks { + gvk := strings.Split(groupVersionKind, "/") + if len(gvk) != 3 { + return nil, fmt.Errorf("invalid GroupVersionKind format: %v, please follow ", groupVersionKind) + } + + if gvk[0] == "core" { + gvk[0] = "" + } + mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk[0], Kind: gvk[2]}, gvk[1]) + if err != nil { + return pruneResources, err + } + var namespaced bool + namespaceScope := mapping.Scope.Name() + switch namespaceScope { + case meta.RESTScopeNameNamespace: + namespaced = true + case meta.RESTScopeNameRoot: + namespaced = false + default: + return pruneResources, fmt.Errorf("Unknown namespace scope: %q", namespaceScope) + } + + pruneResources = append(pruneResources, pruneResource{gvk[0], gvk[1], gvk[2], namespaced}) + } + return pruneResources, nil +} From 670369f2a4a7a7d9dad3a1ae1a334317a135f6c7 Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Mon, 16 Dec 2019 16:41:49 -0800 Subject: [PATCH 5/7] Move patch functionality for apply into its own file. --- .../src/k8s.io/kubectl/pkg/cmd/apply/BUILD | 1 + .../src/k8s.io/kubectl/pkg/cmd/apply/apply.go | 229 +-------------- .../k8s.io/kubectl/pkg/cmd/apply/patcher.go | 265 ++++++++++++++++++ 3 files changed, 267 insertions(+), 228 deletions(-) create mode 100644 staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD b/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD index 03b487639b1..28da9ff59d5 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD @@ -7,6 +7,7 @@ go_library( "apply_edit_last_applied.go", "apply_set_last_applied.go", "apply_view_last_applied.go", + "patcher.go", "prune.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/kubectl/pkg/cmd/apply", diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go index 9fb43eb0c1c..dd9e5bc457e 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go @@ -17,13 +17,9 @@ limitations under the License. package apply import ( - "encoding/json" "fmt" - "io" "net/http" - "time" - "github.com/jonboulle/clockwork" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -33,18 +29,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/jsonmergepatch" - "k8s.io/apimachinery/pkg/util/mergepatch" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/printers" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/klog" - oapi "k8s.io/kube-openapi/pkg/util/proto" "k8s.io/kubectl/pkg/cmd/delete" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/scheme" @@ -103,15 +94,6 @@ type ApplyOptions struct { objectsCached bool } -const ( - // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure - maxPatchRetry = 5 - // backOffPeriod is the period to back off when apply patch results in error. - backOffPeriod = 1 * time.Second - // how many times we can retry before back off - triesBeforeBackOff = 1 -) - var ( applyLong = templates.LongDesc(i18n.T(` Apply a configuration to a resource by filename or stdin. @@ -352,10 +334,6 @@ func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) { // Run executes the `apply` command. func (o *ApplyOptions) Run() error { - var openapiSchema openapi.Resources - if o.OpenAPIPatch { - openapiSchema = o.OpenAPISchema - } dryRunVerifier := &DryRunVerifier{ Finder: cmdutil.NewCRDFinder(cmdutil.CRDFromDynamic(o.DynamicClient)), @@ -521,22 +499,7 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) fmt.Fprintf(o.ErrOut, warningNoLastAppliedConfigAnnotation, o.cmdBaseName) } - helper := resource.NewHelper(info.Client, info.Mapping) - patcher := &Patcher{ - Mapping: info.Mapping, - Helper: helper, - DynamicClient: o.DynamicClient, - Overwrite: o.Overwrite, - BackOff: clockwork.NewRealClock(), - Force: o.DeleteOptions.ForceDeletion, - Cascade: o.DeleteOptions.Cascade, - Timeout: o.DeleteOptions.Timeout, - GracePeriod: o.DeleteOptions.GracePeriod, - ServerDryRun: o.ServerDryRun, - OpenapiSchema: openapiSchema, - Retries: maxPatchRetry, - } - + patcher := newPatcher(o, info) patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut) if err != nil { return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err) @@ -636,34 +599,6 @@ func (o *ApplyOptions) printObjects() error { return nil } -func (p *Patcher) delete(namespace, name string) error { - return runDelete(namespace, name, p.Mapping, p.DynamicClient, p.Cascade, p.GracePeriod, p.ServerDryRun) -} - -// Patcher defines options to patch OpenAPI objects. -type Patcher struct { - Mapping *meta.RESTMapping - Helper *resource.Helper - DynamicClient dynamic.Interface - - Overwrite bool - BackOff clockwork.Clock - - Force bool - Cascade bool - Timeout time.Duration - GracePeriod int - ServerDryRun bool - - // If set, forces the patch against a specific resourceVersion - ResourceVersion *string - - // Number of retries to make if the patch fails with conflict - Retries int - - OpenapiSchema openapi.Resources -} - // DryRunVerifier verifies if a given group-version-kind supports DryRun // against the current server. Sending dryRun requests to apiserver that // don't support it will result in objects being unwillingly persisted. @@ -702,165 +637,3 @@ func (v *DryRunVerifier) HasSupport(gvk schema.GroupVersionKind) error { } return nil } - -func addResourceVersion(patch []byte, rv string) ([]byte, error) { - var patchMap map[string]interface{} - err := json.Unmarshal(patch, &patchMap) - if err != nil { - return nil, err - } - u := unstructured.Unstructured{Object: patchMap} - a, err := meta.Accessor(&u) - if err != nil { - return nil, err - } - a.SetResourceVersion(rv) - - return json.Marshal(patchMap) -} - -func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { - // Serialize the current configuration of the object from the server. - current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err) - } - - // Retrieve the original configuration of the object from the annotation. - original, err := util.GetOriginalConfiguration(obj) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err) - } - - var patchType types.PatchType - var patch []byte - var lookupPatchMeta strategicpatch.LookupPatchMeta - var schema oapi.Schema - createPatchErrFormat := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:" - - // Create the versioned struct from the type defined in the restmapping - // (which is the API version we'll be submitting the patch to) - versionedObject, err := scheme.Scheme.New(p.Mapping.GroupVersionKind) - switch { - case runtime.IsNotRegisteredError(err): - // fall back to generic JSON merge patch - patchType = types.MergePatchType - preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"), - mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")} - patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...) - if err != nil { - if mergepatch.IsPreconditionFailed(err) { - return nil, nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed") - } - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) - } - case err != nil: - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("getting instance of versioned object for %v:", p.Mapping.GroupVersionKind), source, err) - case err == nil: - // Compute a three way strategic merge patch to send to server. - patchType = types.StrategicMergePatchType - - // Try to use openapi first if the openapi spec is available and can successfully calculate the patch. - // Otherwise, fall back to baked-in types. - if p.OpenapiSchema != nil { - if schema = p.OpenapiSchema.LookupResource(p.Mapping.GroupVersionKind); schema != nil { - lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema} - if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil { - fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err) - } else { - patchType = types.StrategicMergePatchType - patch = openapiPatch - } - } - } - - if patch == nil { - lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) - } - patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) - } - } - } - - if string(patch) == "{}" { - return patch, obj, nil - } - - if p.ResourceVersion != nil { - patch, err = addResourceVersion(patch, *p.ResourceVersion) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err) - } - } - - options := metav1.PatchOptions{} - if p.ServerDryRun { - options.DryRun = []string{metav1.DryRunAll} - } - - patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, &options) - return patch, patchedObj, err -} - -// Patch tries to patch an OpenAPI resource. On success, returns the merge patch as well -// the final patched object. On failure, returns an error. -func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { - var getErr error - patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut) - if p.Retries == 0 { - p.Retries = maxPatchRetry - } - for i := 1; i <= p.Retries && errors.IsConflict(err); i++ { - if i > triesBeforeBackOff { - p.BackOff.Sleep(backOffPeriod) - } - current, getErr = p.Helper.Get(namespace, name, false) - if getErr != nil { - return nil, nil, getErr - } - patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut) - } - if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force { - patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name) - } - return patchBytes, patchObject, err -} - -func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) { - if err := p.delete(namespace, name); err != nil { - return modified, nil, err - } - // TODO: use wait - if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) { - if _, err := p.Helper.Get(namespace, name, false); !errors.IsNotFound(err) { - return false, err - } - return true, nil - }); err != nil { - return modified, nil, err - } - versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil) - if err != nil { - return modified, nil, err - } - options := metav1.CreateOptions{} - if p.ServerDryRun { - options.DryRun = []string{metav1.DryRunAll} - } - createdObject, err := p.Helper.Create(namespace, true, versionedObject, &options) - if err != nil { - // restore the original object if we fail to create the new one - // but still propagate and advertise error to user - recreated, recreateErr := p.Helper.Create(namespace, true, original, &options) - if recreateErr != nil { - err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one:\n\n%v.\n\nAdditionally, an error occurred attempting to restore the original object:\n\n%v", err, recreateErr) - } else { - createdObject = recreated - } - } - return modified, createdObject, err -} diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go new file mode 100644 index 00000000000..d606e1de258 --- /dev/null +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go @@ -0,0 +1,265 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apply + +import ( + "encoding/json" + "fmt" + "io" + "time" + + "github.com/jonboulle/clockwork" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/jsonmergepatch" + "k8s.io/apimachinery/pkg/util/mergepatch" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/dynamic" + oapi "k8s.io/kube-openapi/pkg/util/proto" + cmdutil "k8s.io/kubectl/pkg/cmd/util" + "k8s.io/kubectl/pkg/scheme" + "k8s.io/kubectl/pkg/util" + "k8s.io/kubectl/pkg/util/openapi" +) + +const ( + // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure + maxPatchRetry = 5 + // backOffPeriod is the period to back off when apply patch results in error. + backOffPeriod = 1 * time.Second + // how many times we can retry before back off + triesBeforeBackOff = 1 +) + +// Patcher defines options to patch OpenAPI objects. +type Patcher struct { + Mapping *meta.RESTMapping + Helper *resource.Helper + DynamicClient dynamic.Interface + + Overwrite bool + BackOff clockwork.Clock + + Force bool + Cascade bool + Timeout time.Duration + GracePeriod int + ServerDryRun bool + + // If set, forces the patch against a specific resourceVersion + ResourceVersion *string + + // Number of retries to make if the patch fails with conflict + Retries int + + OpenapiSchema openapi.Resources +} + +func newPatcher(o *ApplyOptions, info *resource.Info) *Patcher { + var openapiSchema openapi.Resources + if o.OpenAPIPatch { + openapiSchema = o.OpenAPISchema + } + + helper := resource.NewHelper(info.Client, info.Mapping) + return &Patcher{ + Mapping: info.Mapping, + Helper: helper, + DynamicClient: o.DynamicClient, + Overwrite: o.Overwrite, + BackOff: clockwork.NewRealClock(), + Force: o.DeleteOptions.ForceDeletion, + Cascade: o.DeleteOptions.Cascade, + Timeout: o.DeleteOptions.Timeout, + GracePeriod: o.DeleteOptions.GracePeriod, + ServerDryRun: o.ServerDryRun, + OpenapiSchema: openapiSchema, + Retries: maxPatchRetry, + } +} + +func (p *Patcher) delete(namespace, name string) error { + return runDelete(namespace, name, p.Mapping, p.DynamicClient, p.Cascade, p.GracePeriod, p.ServerDryRun) +} + +func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { + // Serialize the current configuration of the object from the server. + current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err) + } + + // Retrieve the original configuration of the object from the annotation. + original, err := util.GetOriginalConfiguration(obj) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err) + } + + var patchType types.PatchType + var patch []byte + var lookupPatchMeta strategicpatch.LookupPatchMeta + var schema oapi.Schema + createPatchErrFormat := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:" + + // Create the versioned struct from the type defined in the restmapping + // (which is the API version we'll be submitting the patch to) + versionedObject, err := scheme.Scheme.New(p.Mapping.GroupVersionKind) + switch { + case runtime.IsNotRegisteredError(err): + // fall back to generic JSON merge patch + patchType = types.MergePatchType + preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"), + mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")} + patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...) + if err != nil { + if mergepatch.IsPreconditionFailed(err) { + return nil, nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed") + } + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) + } + case err != nil: + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("getting instance of versioned object for %v:", p.Mapping.GroupVersionKind), source, err) + case err == nil: + // Compute a three way strategic merge patch to send to server. + patchType = types.StrategicMergePatchType + + // Try to use openapi first if the openapi spec is available and can successfully calculate the patch. + // Otherwise, fall back to baked-in types. + if p.OpenapiSchema != nil { + if schema = p.OpenapiSchema.LookupResource(p.Mapping.GroupVersionKind); schema != nil { + lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema} + if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil { + fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err) + } else { + patchType = types.StrategicMergePatchType + patch = openapiPatch + } + } + } + + if patch == nil { + lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) + } + patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) + } + } + } + + if string(patch) == "{}" { + return patch, obj, nil + } + + if p.ResourceVersion != nil { + patch, err = addResourceVersion(patch, *p.ResourceVersion) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err) + } + } + + options := metav1.PatchOptions{} + if p.ServerDryRun { + options.DryRun = []string{metav1.DryRunAll} + } + + patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, &options) + return patch, patchedObj, err +} + +// Patch tries to patch an OpenAPI resource. On success, returns the merge patch as well +// the final patched object. On failure, returns an error. +func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { + var getErr error + patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut) + if p.Retries == 0 { + p.Retries = maxPatchRetry + } + for i := 1; i <= p.Retries && errors.IsConflict(err); i++ { + if i > triesBeforeBackOff { + p.BackOff.Sleep(backOffPeriod) + } + current, getErr = p.Helper.Get(namespace, name, false) + if getErr != nil { + return nil, nil, getErr + } + patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut) + } + if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force { + patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name) + } + return patchBytes, patchObject, err +} + +func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) { + if err := p.delete(namespace, name); err != nil { + return modified, nil, err + } + // TODO: use wait + if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) { + if _, err := p.Helper.Get(namespace, name, false); !errors.IsNotFound(err) { + return false, err + } + return true, nil + }); err != nil { + return modified, nil, err + } + versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil) + if err != nil { + return modified, nil, err + } + options := metav1.CreateOptions{} + if p.ServerDryRun { + options.DryRun = []string{metav1.DryRunAll} + } + createdObject, err := p.Helper.Create(namespace, true, versionedObject, &options) + if err != nil { + // restore the original object if we fail to create the new one + // but still propagate and advertise error to user + recreated, recreateErr := p.Helper.Create(namespace, true, original, &options) + if recreateErr != nil { + err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one:\n\n%v.\n\nAdditionally, an error occurred attempting to restore the original object:\n\n%v", err, recreateErr) + } else { + createdObject = recreated + } + } + return modified, createdObject, err +} + +func addResourceVersion(patch []byte, rv string) ([]byte, error) { + var patchMap map[string]interface{} + err := json.Unmarshal(patch, &patchMap) + if err != nil { + return nil, err + } + u := unstructured.Unstructured{Object: patchMap} + a, err := meta.Accessor(&u) + if err != nil { + return nil, err + } + a.SetResourceVersion(rv) + + return json.Marshal(patchMap) +} From 4e45421f610eb881a03c0d82fca038788db12b08 Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Tue, 17 Dec 2019 14:19:00 -0800 Subject: [PATCH 6/7] Moves visitedUids and visitedNamespaces (used for pruning) into ApplyOptions --- .../src/k8s.io/kubectl/pkg/cmd/apply/apply.go | 60 ++++++++++++++----- .../src/k8s.io/kubectl/pkg/cmd/apply/prune.go | 6 +- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go index dd9e5bc457e..f0e1693f338 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go @@ -92,6 +92,19 @@ type ApplyOptions struct { // not call the resource builder; only return the set objects. objects []*resource.Info objectsCached bool + + // Stores visited objects/namespaces for later use + // calculating the set of objects to prune. + VisitedUids sets.String + VisitedNamespaces sets.String + + // Function run after the objects are generated and + // stored in the "objects" field, but before the + // apply is run on these objects. + preProcessorFn func(*ApplyOptions) error + // Function run after all objects have been applied. + // The standard postprocessorFn is "PrintAndPrune()". + postProcessorFn func(*ApplyOptions) error } var ( @@ -140,6 +153,11 @@ func NewApplyOptions(ioStreams genericclioptions.IOStreams) *ApplyOptions { objects: []*resource.Info{}, objectsCached: false, + + VisitedUids: sets.NewString(), + VisitedNamespaces: sets.NewString(), + + postProcessorFn: PrintAndPrune, } } @@ -340,9 +358,6 @@ func (o *ApplyOptions) Run() error { OpenAPIGetter: o.DiscoveryClient, } - visitedUids := sets.NewString() - visitedNamespaces := sets.NewString() - infos, err := o.GetObjects() if err != nil { return err @@ -360,9 +375,7 @@ func (o *ApplyOptions) Run() error { } } - if info.Namespaced() { - visitedNamespaces.Insert(info.Namespace) - } + o.MarkNamespaceVisited(info) if err := o.Recorder.Record(info.Object); err != nil { klog.V(4).Infof("error recording current command: %v", err) @@ -414,12 +427,11 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) } info.Refresh(obj, true) - metadata, err := meta.Accessor(info.Object) - if err != nil { + + if err := o.MarkObjectVisited(info); err != nil { return err } - visitedUids.Insert(string(metadata.GetUID())) if o.shouldPrintObject() { continue } @@ -467,11 +479,9 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) info.Refresh(obj, true) } - metadata, err := meta.Accessor(info.Object) - if err != nil { + if err := o.MarkObjectVisited(info); err != nil { return err } - visitedUids.Insert(string(metadata.GetUID())) if o.shouldPrintObject() { continue @@ -487,13 +497,12 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) continue } - metadata, err := meta.Accessor(info.Object) - if err != nil { + if err := o.MarkObjectVisited(info); err != nil { return err } - visitedUids.Insert(string(metadata.GetUID())) if !o.DryRun { + metadata, _ := meta.Accessor(info.Object) annotationMap := metadata.GetAnnotations() if _, ok := annotationMap[corev1.LastAppliedConfigAnnotation]; !ok { fmt.Fprintf(o.ErrOut, warningNoLastAppliedConfigAnnotation, o.cmdBaseName) @@ -537,7 +546,7 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) } if o.Prune { - p := newPruner(o, visitedUids, visitedNamespaces) + p := newPruner(o) return p.pruneAll(o) } @@ -599,6 +608,25 @@ func (o *ApplyOptions) printObjects() error { return nil } +// MarkNamespaceVisited keeps track of which namespaces the applied +// objects belong to. Used for pruning. +func (o *ApplyOptions) MarkNamespaceVisited(info *resource.Info) { + if info.Namespaced() { + o.VisitedNamespaces.Insert(info.Namespace) + } +} + +// MarkNamespaceVisited keeps track of UIDs of the applied +// objects. Used for pruning. +func (o *ApplyOptions) MarkObjectVisited(info *resource.Info) error { + metadata, err := meta.Accessor(info.Object) + if err != nil { + return err + } + o.VisitedUids.Insert(string(metadata.GetUID())) + return nil +} + // DryRunVerifier verifies if a given group-version-kind supports DryRun // against the current server. Sending dryRun requests to apiserver that // don't support it will result in objects being unwillingly persisted. diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/prune.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/prune.go index 3987aa48628..eb118b5046d 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/prune.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/prune.go @@ -49,14 +49,14 @@ type pruner struct { out io.Writer } -func newPruner(o *ApplyOptions, visitedUids sets.String, visitedNamespaces sets.String) pruner { +func newPruner(o *ApplyOptions) pruner { return pruner{ mapper: o.Mapper, dynamicClient: o.DynamicClient, labelSelector: o.Selector, - visitedUids: visitedUids, - visitedNamespaces: visitedNamespaces, + visitedUids: o.VisitedUids, + visitedNamespaces: o.VisitedNamespaces, cascade: o.DeleteOptions.Cascade, dryRun: o.DryRun, From 79bc63fb4b79caea4b141d318b11a0892ca14c95 Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Tue, 17 Dec 2019 14:58:54 -0800 Subject: [PATCH 7/7] Adds PreProcessor and PostProcessor functions for modifying apply behavior --- .../src/k8s.io/kubectl/pkg/cmd/apply/apply.go | 60 +++++++++++++++---- 1 file changed, 47 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go index f0e1693f338..1bc989f7f20 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go @@ -101,10 +101,10 @@ type ApplyOptions struct { // Function run after the objects are generated and // stored in the "objects" field, but before the // apply is run on these objects. - preProcessorFn func(*ApplyOptions) error + PreProcessorFn func() error // Function run after all objects have been applied. - // The standard postprocessorFn is "PrintAndPrune()". - postProcessorFn func(*ApplyOptions) error + // The standard PostProcessorFn is "PrintAndPrunePostProcessor()". + PostProcessorFn func() error } var ( @@ -156,8 +156,6 @@ func NewApplyOptions(ioStreams genericclioptions.IOStreams) *ApplyOptions { VisitedUids: sets.NewString(), VisitedNamespaces: sets.NewString(), - - postProcessorFn: PrintAndPrune, } } @@ -288,6 +286,8 @@ func (o *ApplyOptions) Complete(f cmdutil.Factory, cmd *cobra.Command) error { } } + o.PostProcessorFn = o.PrintAndPrunePostProcessor() + return nil } @@ -350,6 +350,13 @@ func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) { return o.objects, nil } +// SetObjects stores the set of objects (as resource.Info) to be +// subsequently applied. +func (o *ApplyOptions) SetObjects(infos []*resource.Info) { + o.objects = infos + o.objectsCached = true +} + // Run executes the `apply` command. func (o *ApplyOptions) Run() error { @@ -358,6 +365,15 @@ func (o *ApplyOptions) Run() error { OpenAPIGetter: o.DiscoveryClient, } + if o.PreProcessorFn != nil { + klog.V(4).Infof("Running apply pre-processor function") + if err := o.PreProcessorFn(); err != nil { + return err + } + } + + // Generates the objects using the resource builder if they have not + // already been stored by calling "SetObjects()" in the pre-processor. infos, err := o.GetObjects() if err != nil { return err @@ -365,7 +381,6 @@ func (o *ApplyOptions) Run() error { if len(infos) == 0 { return fmt.Errorf("no objects passed to apply") } - for _, info := range infos { // If server-dry-run is requested but the type doesn't support it, fail right away. @@ -541,13 +556,11 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) } } - if err := o.printObjects(); err != nil { - return err - } - - if o.Prune { - p := newPruner(o) - return p.pruneAll(o) + if o.PostProcessorFn != nil { + klog.V(4).Infof("Running apply post-processor function") + if err := o.PostProcessorFn(); err != nil { + return err + } } return nil @@ -627,6 +640,27 @@ func (o *ApplyOptions) MarkObjectVisited(info *resource.Info) error { return nil } +// PrintAndPrune returns a function which meets the PostProcessorFn +// function signature. This returned function prints all the +// objects as a list (if configured for that), and prunes the +// objects not applied. The returned function is the standard +// apply post processor. +func (o *ApplyOptions) PrintAndPrunePostProcessor() func() error { + + return func() error { + if err := o.printObjects(); err != nil { + return err + } + + if o.Prune { + p := newPruner(o) + return p.pruneAll(o) + } + + return nil + } +} + // DryRunVerifier verifies if a given group-version-kind supports DryRun // against the current server. Sending dryRun requests to apiserver that // don't support it will result in objects being unwillingly persisted.