mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #110619 from ardaguclu/split-patching
(kubectl apply): Split patching types into functions and refactorings
This commit is contained in:
commit
b5f202f0ed
@ -23,6 +23,7 @@ require (
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6
|
||||
github.com/onsi/ginkgo v1.14.0
|
||||
github.com/onsi/gomega v1.10.1
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/russross/blackfriday v1.5.2
|
||||
github.com/spf13/cobra v1.4.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
@ -77,7 +78,6 @@ require (
|
||||
github.com/nxadm/tail v1.4.4 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca // indirect
|
||||
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
|
||||
|
@ -22,19 +22,22 @@ import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
|
||||
"k8s.io/apimachinery/pkg/util/mergepatch"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
oapi "k8s.io/kube-openapi/pkg/util/proto"
|
||||
"k8s.io/kube-openapi/pkg/util/proto"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
"k8s.io/kubectl/pkg/scheme"
|
||||
"k8s.io/kubectl/pkg/util"
|
||||
@ -50,6 +53,8 @@ const (
|
||||
triesBeforeBackOff = 1
|
||||
)
|
||||
|
||||
var createPatchErrFormat = "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
|
||||
|
||||
// Patcher defines options to patch OpenAPI objects.
|
||||
type Patcher struct {
|
||||
Mapping *meta.RESTMapping
|
||||
@ -98,69 +103,51 @@ func (p *Patcher) delete(namespace, name string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
|
||||
func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
|
||||
// Serialize the current configuration of the object from the server.
|
||||
current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
|
||||
if err != nil {
|
||||
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err)
|
||||
return nil, nil, errors.Wrapf(err, "serializing current configuration from:\n%v\nfor:", obj)
|
||||
}
|
||||
|
||||
// Retrieve the original configuration of the object from the annotation.
|
||||
original, err := util.GetOriginalConfiguration(obj)
|
||||
if err != nil {
|
||||
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err)
|
||||
return nil, nil, errors.Wrapf(err, "retrieving original configuration from:\n%v\nfor:", obj)
|
||||
}
|
||||
|
||||
var patchType types.PatchType
|
||||
var patch []byte
|
||||
var lookupPatchMeta strategicpatch.LookupPatchMeta
|
||||
var schema oapi.Schema
|
||||
createPatchErrFormat := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
|
||||
|
||||
// Create the versioned struct from the type defined in the restmapping
|
||||
// (which is the API version we'll be submitting the patch to)
|
||||
versionedObject, err := scheme.Scheme.New(p.Mapping.GroupVersionKind)
|
||||
switch {
|
||||
case runtime.IsNotRegisteredError(err):
|
||||
// fall back to generic JSON merge patch
|
||||
patchType = types.MergePatchType
|
||||
preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
|
||||
mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
|
||||
patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
|
||||
if err != nil {
|
||||
if mergepatch.IsPreconditionFailed(err) {
|
||||
return nil, nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
|
||||
}
|
||||
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
|
||||
patchType := types.StrategicMergePatchType
|
||||
_, err = scheme.Scheme.New(p.Mapping.GroupVersionKind)
|
||||
if err != nil {
|
||||
if !runtime.IsNotRegisteredError(err) {
|
||||
return nil, nil, errors.Wrapf(err, "getting instance of versioned object for %v:", p.Mapping.GroupVersionKind)
|
||||
}
|
||||
case err != nil:
|
||||
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("getting instance of versioned object for %v:", p.Mapping.GroupVersionKind), source, err)
|
||||
case err == nil:
|
||||
// Compute a three way strategic merge patch to send to server.
|
||||
patchType = types.StrategicMergePatchType
|
||||
patchType = types.MergePatchType
|
||||
}
|
||||
|
||||
switch patchType {
|
||||
case types.MergePatchType:
|
||||
patch, err = p.buildMergePatch(original, modified, current)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
|
||||
}
|
||||
case types.StrategicMergePatchType:
|
||||
// Try to use openapi first if the openapi spec is available and can successfully calculate the patch.
|
||||
// Otherwise, fall back to baked-in types.
|
||||
if p.OpenapiSchema != nil {
|
||||
if schema = p.OpenapiSchema.LookupResource(p.Mapping.GroupVersionKind); schema != nil {
|
||||
lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
|
||||
if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil {
|
||||
fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err)
|
||||
} else {
|
||||
patchType = types.StrategicMergePatchType
|
||||
patch = openapiPatch
|
||||
}
|
||||
if s := p.findOpenAPIResource(p.Mapping.GroupVersionKind); s != nil {
|
||||
patch, err = p.buildStrategicMergeFromOpenAPI(s, original, modified, current)
|
||||
if err != nil {
|
||||
// Warn user about problem and continue strategic merge patching using builtin types.
|
||||
fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
if patch == nil {
|
||||
lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
|
||||
patch, err = p.buildStrategicMergeFromBuiltins(p.Mapping.GroupVersionKind, original, modified, current)
|
||||
if err != nil {
|
||||
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
|
||||
}
|
||||
patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite)
|
||||
if err != nil {
|
||||
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
|
||||
return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -172,7 +159,7 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names
|
||||
if p.ResourceVersion != nil {
|
||||
patch, err = addResourceVersion(patch, *p.ResourceVersion)
|
||||
if err != nil {
|
||||
return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err)
|
||||
return nil, nil, errors.Wrap(err, "Failed to insert resourceVersion in patch")
|
||||
}
|
||||
}
|
||||
|
||||
@ -180,15 +167,70 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names
|
||||
return patch, patchedObj, err
|
||||
}
|
||||
|
||||
// buildMergePatch builds patch according to the JSONMergePatch which is used for
|
||||
// custom resource definitions.
|
||||
func (p *Patcher) buildMergePatch(original, modified, current []byte) ([]byte, error) {
|
||||
preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
|
||||
mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
|
||||
patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
|
||||
if err != nil {
|
||||
if mergepatch.IsPreconditionFailed(err) {
|
||||
return nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return patch, nil
|
||||
}
|
||||
|
||||
// buildStrategicMergeFromOpenAPI builds patch from OpenAPI if it is enabled.
|
||||
// This is used for core types which is published in openapi.
|
||||
func (p *Patcher) buildStrategicMergeFromOpenAPI(schema proto.Schema, original, modified, current []byte) ([]byte, error) {
|
||||
lookupPatchMeta := strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
|
||||
if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return openapiPatch, nil
|
||||
}
|
||||
}
|
||||
|
||||
// findOpenAPIResource finds schema of GVK in OpenAPI endpoint.
|
||||
func (p *Patcher) findOpenAPIResource(gvk schema.GroupVersionKind) proto.Schema {
|
||||
if p.OpenapiSchema == nil {
|
||||
return nil
|
||||
}
|
||||
return p.OpenapiSchema.LookupResource(gvk)
|
||||
}
|
||||
|
||||
// buildStrategicMergeFromStruct builds patch from struct. This is used when
|
||||
// openapi endpoint is not working or user disables it by setting openapi-patch flag
|
||||
// to false.
|
||||
func (p *Patcher) buildStrategicMergeFromBuiltins(gvk schema.GroupVersionKind, original, modified, current []byte) ([]byte, error) {
|
||||
versionedObject, err := scheme.Scheme.New(gvk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lookupPatchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return patch, nil
|
||||
}
|
||||
|
||||
// Patch tries to patch an OpenAPI resource. On success, returns the merge patch as well
|
||||
// the final patched object. On failure, returns an error.
|
||||
func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
|
||||
var getErr error
|
||||
patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
|
||||
patchBytes, patchObject, err := p.patchSimple(current, modified, namespace, name, errOut)
|
||||
if p.Retries == 0 {
|
||||
p.Retries = maxPatchRetry
|
||||
}
|
||||
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
|
||||
for i := 1; i <= p.Retries && apierrors.IsConflict(err); i++ {
|
||||
if i > triesBeforeBackOff {
|
||||
p.BackOff.Sleep(backOffPeriod)
|
||||
}
|
||||
@ -196,10 +238,14 @@ func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespa
|
||||
if getErr != nil {
|
||||
return nil, nil, getErr
|
||||
}
|
||||
patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut)
|
||||
patchBytes, patchObject, err = p.patchSimple(current, modified, namespace, name, errOut)
|
||||
}
|
||||
if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force {
|
||||
patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name)
|
||||
if err != nil {
|
||||
if (apierrors.IsConflict(err) || apierrors.IsInvalid(err)) && p.Force {
|
||||
patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name)
|
||||
} else {
|
||||
err = cmdutil.AddSourceToErr("patching", source, err)
|
||||
}
|
||||
}
|
||||
return patchBytes, patchObject, err
|
||||
}
|
||||
@ -210,7 +256,7 @@ func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, name
|
||||
}
|
||||
// TODO: use wait
|
||||
if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) {
|
||||
if _, err := p.Helper.Get(namespace, name); !errors.IsNotFound(err) {
|
||||
if _, err := p.Helper.Get(namespace, name); !apierrors.IsNotFound(err) {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
|
Loading…
Reference in New Issue
Block a user