From 89daa462ffd1dc85ebf32529545fbf5d5413b722 Mon Sep 17 00:00:00 2001 From: Antoine Pelisse Date: Fri, 16 Nov 2018 10:32:47 -0800 Subject: [PATCH 1/2] Allow kubectl patcher to patch specific version Give a new "ResourceVersion" option to the patch so that the patch can be forced against a specific version. Also there is no way to customize how many retries the patcher should do on conflicts, so also add a "Retries" option that let's one customize it. --- pkg/kubectl/cmd/apply/apply.go | 36 +++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/pkg/kubectl/cmd/apply/apply.go b/pkg/kubectl/cmd/apply/apply.go index 08919f23b2b..a0fb2dd4ef0 100644 --- a/pkg/kubectl/cmd/apply/apply.go +++ b/pkg/kubectl/cmd/apply/apply.go @@ -17,6 +17,7 @@ limitations under the License. package apply import ( + "encoding/json" "fmt" "io" "strings" @@ -436,6 +437,7 @@ func (o *ApplyOptions) Run() error { GracePeriod: o.DeleteOptions.GracePeriod, ServerDryRun: o.ServerDryRun, OpenapiSchema: openapiSchema, + Retries: maxPatchRetry, } patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut) @@ -699,6 +701,12 @@ type Patcher struct { GracePeriod int ServerDryRun bool + // If set, forces the patch against a specific resourceVersion + ResourceVersion *string + + // Number of retries to make if the patch fails with conflict + Retries int + OpenapiSchema openapi.Resources } @@ -741,6 +749,22 @@ func (v *DryRunVerifier) HasSupport(gvk schema.GroupVersionKind) error { return nil } +func addResourceVersion(patch []byte, rv string) ([]byte, error) { + var patchMap map[string]interface{} + err := json.Unmarshal(patch, &patchMap) + if err != nil { + return nil, err + } + u := unstructured.Unstructured{Object: patchMap} + a, err := meta.Accessor(&u) + if err != nil { + return nil, err + } + a.SetResourceVersion(rv) + + return json.Marshal(patchMap) +} + func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { // Serialize the current configuration of the object from the server. current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) @@ -812,6 +836,13 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names return patch, obj, nil } + if p.ResourceVersion != nil { + patch, err = addResourceVersion(patch, *p.ResourceVersion) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err) + } + } + options := metav1.UpdateOptions{} if p.ServerDryRun { options.DryRun = []string{metav1.DryRunAll} @@ -824,7 +855,10 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { var getErr error patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut) - for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ { + if p.Retries == 0 { + p.Retries = maxPatchRetry + } + for i := 1; i <= p.Retries && errors.IsConflict(err); i++ { if i > triesBeforeBackOff { p.BackOff.Sleep(backOffPeriod) } From a889f37505b2cc3b6feb6ab83d889ef363268a50 Mon Sep 17 00:00:00 2001 From: Antoine Pelisse Date: Fri, 16 Nov 2018 13:16:09 -0800 Subject: [PATCH 2/2] Optimistic-locking on diff There is currently a race-condition when diffing, where we get the object and then run a server-side dry-run patch and compare the two results. If something changes the object on the server between the get and the patch, the diff is going to show unrelated changes. We can now specify the exact revisionversion that we want to patch, and that will return a conflict, and we can retry multiple times to get a non-conflicting diff. Eventually (after 3 times), we diff without checking the version and throw a warning that the diff might be partially wrong. --- pkg/kubectl/cmd/diff/BUILD | 2 + pkg/kubectl/cmd/diff/diff.go | 72 +++++++++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/pkg/kubectl/cmd/diff/BUILD b/pkg/kubectl/cmd/diff/BUILD index 4a99f161251..93e0fb64ebd 100644 --- a/pkg/kubectl/cmd/diff/BUILD +++ b/pkg/kubectl/cmd/diff/BUILD @@ -14,6 +14,7 @@ go_library( "//pkg/kubectl/util/i18n:go_default_library", "//pkg/kubectl/util/templates:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", @@ -21,6 +22,7 @@ go_library( "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/resource:go_default_library", "//vendor/github.com/jonboulle/clockwork:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", + "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], diff --git a/pkg/kubectl/cmd/diff/diff.go b/pkg/kubectl/cmd/diff/diff.go index 9c52dbfb6b4..89ee7f360ee 100644 --- a/pkg/kubectl/cmd/diff/diff.go +++ b/pkg/kubectl/cmd/diff/diff.go @@ -26,11 +26,13 @@ import ( "github.com/jonboulle/clockwork" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/klog" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/apply" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" @@ -60,6 +62,9 @@ var ( cat service.yaml | kubectl diff -f -`)) ) +// Number of times we try to diff before giving-up +const maxRetries = 4 + type DiffOptions struct { FilenameOptions resource.FilenameOptions } @@ -228,6 +233,7 @@ type InfoObject struct { Info *resource.Info Encoder runtime.Encoder OpenAPI openapi.Resources + Force bool } var _ Object = &InfoObject{} @@ -251,6 +257,16 @@ func (obj InfoObject) Merged() (runtime.Object, error) { ) } + var resourceVersion *string + if !obj.Force { + accessor, err := meta.Accessor(obj.Info.Object) + if err != nil { + return nil, err + } + str := accessor.GetResourceVersion() + resourceVersion = &str + } + modified, err := kubectl.GetModifiedConfiguration(obj.LocalObj, false, unstructured.UnstructuredJSONScheme) if err != nil { return nil, err @@ -259,12 +275,13 @@ func (obj InfoObject) Merged() (runtime.Object, error) { // This is using the patcher from apply, to keep the same behavior. // We plan on replacing this with server-side apply when it becomes available. patcher := &apply.Patcher{ - Mapping: obj.Info.Mapping, - Helper: resource.NewHelper(obj.Info.Client, obj.Info.Mapping), - Overwrite: true, - BackOff: clockwork.NewRealClock(), - ServerDryRun: true, - OpenapiSchema: obj.OpenAPI, + Mapping: obj.Info.Mapping, + Helper: resource.NewHelper(obj.Info.Client, obj.Info.Mapping), + Overwrite: true, + BackOff: clockwork.NewRealClock(), + ServerDryRun: true, + OpenapiSchema: obj.OpenAPI, + ResourceVersion: resourceVersion, } _, result, err := patcher.Patch(obj.Info.Object, modified, obj.Info.Source, obj.Info.Namespace, obj.Info.Name, nil) @@ -319,6 +336,10 @@ func (d *Differ) TearDown() { d.To.Dir.Delete() // Ignore error } +func isConflict(err error) bool { + return err != nil && errors.IsConflict(err) +} + // RunDiff uses the factory to parse file arguments, find the version to // diff, and find each Info object for each files, and runs against the // differ. @@ -376,21 +397,36 @@ func RunDiff(f cmdutil.Factory, diff *DiffProgram, options *DiffOptions) error { } local := info.Object.DeepCopyObject() - if err := info.Get(); err != nil { - if !errors.IsNotFound(err) { - return err + for i := 1; i <= maxRetries; i++ { + if err = info.Get(); err != nil { + if !errors.IsNotFound(err) { + return err + } + info.Object = nil } - info.Object = nil - } - obj := InfoObject{ - LocalObj: local, - Info: info, - Encoder: scheme.DefaultJSONEncoder(), - OpenAPI: schema, - } + force := i == maxRetries + if force { + klog.Warningf( + "Object (%v: %v) keeps changing, diffing without lock", + info.Object.GetObjectKind().GroupVersionKind(), + info.Name, + ) + } + obj := InfoObject{ + LocalObj: local, + Info: info, + Encoder: scheme.DefaultJSONEncoder(), + OpenAPI: schema, + Force: force, + } - return differ.Diff(obj, printer) + err = differ.Diff(obj, printer) + if !isConflict(err) { + break + } + } + return err }) if err != nil { return err