diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD b/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD index 03b487639b1..28da9ff59d5 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/BUILD @@ -7,6 +7,7 @@ go_library( "apply_edit_last_applied.go", "apply_set_last_applied.go", "apply_view_last_applied.go", + "patcher.go", "prune.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/kubectl/pkg/cmd/apply", diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go index 9fb43eb0c1c..dd9e5bc457e 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/apply.go @@ -17,13 +17,9 @@ limitations under the License. package apply import ( - "encoding/json" "fmt" - "io" "net/http" - "time" - "github.com/jonboulle/clockwork" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -33,18 +29,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/jsonmergepatch" - "k8s.io/apimachinery/pkg/util/mergepatch" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/printers" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/klog" - oapi "k8s.io/kube-openapi/pkg/util/proto" "k8s.io/kubectl/pkg/cmd/delete" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/scheme" @@ -103,15 +94,6 @@ type ApplyOptions struct { objectsCached bool } -const ( - // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure - maxPatchRetry = 5 - // backOffPeriod is the period to back off when apply patch results in error. - backOffPeriod = 1 * time.Second - // how many times we can retry before back off - triesBeforeBackOff = 1 -) - var ( applyLong = templates.LongDesc(i18n.T(` Apply a configuration to a resource by filename or stdin. @@ -352,10 +334,6 @@ func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) { // Run executes the `apply` command. func (o *ApplyOptions) Run() error { - var openapiSchema openapi.Resources - if o.OpenAPIPatch { - openapiSchema = o.OpenAPISchema - } dryRunVerifier := &DryRunVerifier{ Finder: cmdutil.NewCRDFinder(cmdutil.CRDFromDynamic(o.DynamicClient)), @@ -521,22 +499,7 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err) fmt.Fprintf(o.ErrOut, warningNoLastAppliedConfigAnnotation, o.cmdBaseName) } - helper := resource.NewHelper(info.Client, info.Mapping) - patcher := &Patcher{ - Mapping: info.Mapping, - Helper: helper, - DynamicClient: o.DynamicClient, - Overwrite: o.Overwrite, - BackOff: clockwork.NewRealClock(), - Force: o.DeleteOptions.ForceDeletion, - Cascade: o.DeleteOptions.Cascade, - Timeout: o.DeleteOptions.Timeout, - GracePeriod: o.DeleteOptions.GracePeriod, - ServerDryRun: o.ServerDryRun, - OpenapiSchema: openapiSchema, - Retries: maxPatchRetry, - } - + patcher := newPatcher(o, info) patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut) if err != nil { return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err) @@ -636,34 +599,6 @@ func (o *ApplyOptions) printObjects() error { return nil } -func (p *Patcher) delete(namespace, name string) error { - return runDelete(namespace, name, p.Mapping, p.DynamicClient, p.Cascade, p.GracePeriod, p.ServerDryRun) -} - -// Patcher defines options to patch OpenAPI objects. -type Patcher struct { - Mapping *meta.RESTMapping - Helper *resource.Helper - DynamicClient dynamic.Interface - - Overwrite bool - BackOff clockwork.Clock - - Force bool - Cascade bool - Timeout time.Duration - GracePeriod int - ServerDryRun bool - - // If set, forces the patch against a specific resourceVersion - ResourceVersion *string - - // Number of retries to make if the patch fails with conflict - Retries int - - OpenapiSchema openapi.Resources -} - // DryRunVerifier verifies if a given group-version-kind supports DryRun // against the current server. Sending dryRun requests to apiserver that // don't support it will result in objects being unwillingly persisted. @@ -702,165 +637,3 @@ func (v *DryRunVerifier) HasSupport(gvk schema.GroupVersionKind) error { } return nil } - -func addResourceVersion(patch []byte, rv string) ([]byte, error) { - var patchMap map[string]interface{} - err := json.Unmarshal(patch, &patchMap) - if err != nil { - return nil, err - } - u := unstructured.Unstructured{Object: patchMap} - a, err := meta.Accessor(&u) - if err != nil { - return nil, err - } - a.SetResourceVersion(rv) - - return json.Marshal(patchMap) -} - -func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { - // Serialize the current configuration of the object from the server. - current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err) - } - - // Retrieve the original configuration of the object from the annotation. - original, err := util.GetOriginalConfiguration(obj) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err) - } - - var patchType types.PatchType - var patch []byte - var lookupPatchMeta strategicpatch.LookupPatchMeta - var schema oapi.Schema - createPatchErrFormat := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:" - - // Create the versioned struct from the type defined in the restmapping - // (which is the API version we'll be submitting the patch to) - versionedObject, err := scheme.Scheme.New(p.Mapping.GroupVersionKind) - switch { - case runtime.IsNotRegisteredError(err): - // fall back to generic JSON merge patch - patchType = types.MergePatchType - preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"), - mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")} - patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...) - if err != nil { - if mergepatch.IsPreconditionFailed(err) { - return nil, nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed") - } - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) - } - case err != nil: - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("getting instance of versioned object for %v:", p.Mapping.GroupVersionKind), source, err) - case err == nil: - // Compute a three way strategic merge patch to send to server. - patchType = types.StrategicMergePatchType - - // Try to use openapi first if the openapi spec is available and can successfully calculate the patch. - // Otherwise, fall back to baked-in types. - if p.OpenapiSchema != nil { - if schema = p.OpenapiSchema.LookupResource(p.Mapping.GroupVersionKind); schema != nil { - lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema} - if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil { - fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err) - } else { - patchType = types.StrategicMergePatchType - patch = openapiPatch - } - } - } - - if patch == nil { - lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) - } - patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) - } - } - } - - if string(patch) == "{}" { - return patch, obj, nil - } - - if p.ResourceVersion != nil { - patch, err = addResourceVersion(patch, *p.ResourceVersion) - if err != nil { - return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err) - } - } - - options := metav1.PatchOptions{} - if p.ServerDryRun { - options.DryRun = []string{metav1.DryRunAll} - } - - patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, &options) - return patch, patchedObj, err -} - -// Patch tries to patch an OpenAPI resource. On success, returns the merge patch as well -// the final patched object. On failure, returns an error. -func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { - var getErr error - patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut) - if p.Retries == 0 { - p.Retries = maxPatchRetry - } - for i := 1; i <= p.Retries && errors.IsConflict(err); i++ { - if i > triesBeforeBackOff { - p.BackOff.Sleep(backOffPeriod) - } - current, getErr = p.Helper.Get(namespace, name, false) - if getErr != nil { - return nil, nil, getErr - } - patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut) - } - if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force { - patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name) - } - return patchBytes, patchObject, err -} - -func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) { - if err := p.delete(namespace, name); err != nil { - return modified, nil, err - } - // TODO: use wait - if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) { - if _, err := p.Helper.Get(namespace, name, false); !errors.IsNotFound(err) { - return false, err - } - return true, nil - }); err != nil { - return modified, nil, err - } - versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil) - if err != nil { - return modified, nil, err - } - options := metav1.CreateOptions{} - if p.ServerDryRun { - options.DryRun = []string{metav1.DryRunAll} - } - createdObject, err := p.Helper.Create(namespace, true, versionedObject, &options) - if err != nil { - // restore the original object if we fail to create the new one - // but still propagate and advertise error to user - recreated, recreateErr := p.Helper.Create(namespace, true, original, &options) - if recreateErr != nil { - err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one:\n\n%v.\n\nAdditionally, an error occurred attempting to restore the original object:\n\n%v", err, recreateErr) - } else { - createdObject = recreated - } - } - return modified, createdObject, err -} diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go new file mode 100644 index 00000000000..d606e1de258 --- /dev/null +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go @@ -0,0 +1,265 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apply + +import ( + "encoding/json" + "fmt" + "io" + "time" + + "github.com/jonboulle/clockwork" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/jsonmergepatch" + "k8s.io/apimachinery/pkg/util/mergepatch" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/dynamic" + oapi "k8s.io/kube-openapi/pkg/util/proto" + cmdutil "k8s.io/kubectl/pkg/cmd/util" + "k8s.io/kubectl/pkg/scheme" + "k8s.io/kubectl/pkg/util" + "k8s.io/kubectl/pkg/util/openapi" +) + +const ( + // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure + maxPatchRetry = 5 + // backOffPeriod is the period to back off when apply patch results in error. + backOffPeriod = 1 * time.Second + // how many times we can retry before back off + triesBeforeBackOff = 1 +) + +// Patcher defines options to patch OpenAPI objects. +type Patcher struct { + Mapping *meta.RESTMapping + Helper *resource.Helper + DynamicClient dynamic.Interface + + Overwrite bool + BackOff clockwork.Clock + + Force bool + Cascade bool + Timeout time.Duration + GracePeriod int + ServerDryRun bool + + // If set, forces the patch against a specific resourceVersion + ResourceVersion *string + + // Number of retries to make if the patch fails with conflict + Retries int + + OpenapiSchema openapi.Resources +} + +func newPatcher(o *ApplyOptions, info *resource.Info) *Patcher { + var openapiSchema openapi.Resources + if o.OpenAPIPatch { + openapiSchema = o.OpenAPISchema + } + + helper := resource.NewHelper(info.Client, info.Mapping) + return &Patcher{ + Mapping: info.Mapping, + Helper: helper, + DynamicClient: o.DynamicClient, + Overwrite: o.Overwrite, + BackOff: clockwork.NewRealClock(), + Force: o.DeleteOptions.ForceDeletion, + Cascade: o.DeleteOptions.Cascade, + Timeout: o.DeleteOptions.Timeout, + GracePeriod: o.DeleteOptions.GracePeriod, + ServerDryRun: o.ServerDryRun, + OpenapiSchema: openapiSchema, + Retries: maxPatchRetry, + } +} + +func (p *Patcher) delete(namespace, name string) error { + return runDelete(namespace, name, p.Mapping, p.DynamicClient, p.Cascade, p.GracePeriod, p.ServerDryRun) +} + +func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { + // Serialize the current configuration of the object from the server. + current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err) + } + + // Retrieve the original configuration of the object from the annotation. + original, err := util.GetOriginalConfiguration(obj) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err) + } + + var patchType types.PatchType + var patch []byte + var lookupPatchMeta strategicpatch.LookupPatchMeta + var schema oapi.Schema + createPatchErrFormat := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:" + + // Create the versioned struct from the type defined in the restmapping + // (which is the API version we'll be submitting the patch to) + versionedObject, err := scheme.Scheme.New(p.Mapping.GroupVersionKind) + switch { + case runtime.IsNotRegisteredError(err): + // fall back to generic JSON merge patch + patchType = types.MergePatchType + preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"), + mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")} + patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...) + if err != nil { + if mergepatch.IsPreconditionFailed(err) { + return nil, nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed") + } + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) + } + case err != nil: + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("getting instance of versioned object for %v:", p.Mapping.GroupVersionKind), source, err) + case err == nil: + // Compute a three way strategic merge patch to send to server. + patchType = types.StrategicMergePatchType + + // Try to use openapi first if the openapi spec is available and can successfully calculate the patch. + // Otherwise, fall back to baked-in types. + if p.OpenapiSchema != nil { + if schema = p.OpenapiSchema.LookupResource(p.Mapping.GroupVersionKind); schema != nil { + lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema} + if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil { + fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err) + } else { + patchType = types.StrategicMergePatchType + patch = openapiPatch + } + } + } + + if patch == nil { + lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) + } + patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err) + } + } + } + + if string(patch) == "{}" { + return patch, obj, nil + } + + if p.ResourceVersion != nil { + patch, err = addResourceVersion(patch, *p.ResourceVersion) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err) + } + } + + options := metav1.PatchOptions{} + if p.ServerDryRun { + options.DryRun = []string{metav1.DryRunAll} + } + + patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, &options) + return patch, patchedObj, err +} + +// Patch tries to patch an OpenAPI resource. On success, returns the merge patch as well +// the final patched object. On failure, returns an error. +func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { + var getErr error + patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut) + if p.Retries == 0 { + p.Retries = maxPatchRetry + } + for i := 1; i <= p.Retries && errors.IsConflict(err); i++ { + if i > triesBeforeBackOff { + p.BackOff.Sleep(backOffPeriod) + } + current, getErr = p.Helper.Get(namespace, name, false) + if getErr != nil { + return nil, nil, getErr + } + patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut) + } + if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force { + patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name) + } + return patchBytes, patchObject, err +} + +func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) { + if err := p.delete(namespace, name); err != nil { + return modified, nil, err + } + // TODO: use wait + if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) { + if _, err := p.Helper.Get(namespace, name, false); !errors.IsNotFound(err) { + return false, err + } + return true, nil + }); err != nil { + return modified, nil, err + } + versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil) + if err != nil { + return modified, nil, err + } + options := metav1.CreateOptions{} + if p.ServerDryRun { + options.DryRun = []string{metav1.DryRunAll} + } + createdObject, err := p.Helper.Create(namespace, true, versionedObject, &options) + if err != nil { + // restore the original object if we fail to create the new one + // but still propagate and advertise error to user + recreated, recreateErr := p.Helper.Create(namespace, true, original, &options) + if recreateErr != nil { + err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one:\n\n%v.\n\nAdditionally, an error occurred attempting to restore the original object:\n\n%v", err, recreateErr) + } else { + createdObject = recreated + } + } + return modified, createdObject, err +} + +func addResourceVersion(patch []byte, rv string) ([]byte, error) { + var patchMap map[string]interface{} + err := json.Unmarshal(patch, &patchMap) + if err != nil { + return nil, err + } + u := unstructured.Unstructured{Object: patchMap} + a, err := meta.Accessor(&u) + if err != nil { + return nil, err + } + a.SetResourceVersion(rv) + + return json.Marshal(patchMap) +}