mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #26557 from AdoHe/patch_retry
Automatic merge from submit-queue kubectl apply retry stale resource version ```release-note kubectl apply: retry applying a patch if a version conflict error is encountered ``` []() fixes #15493 @pwittrock I just got my original implementation back, ptal.
This commit is contained in:
commit
0d02f8c0f6
@ -31,8 +31,8 @@ type debugError interface {
|
|||||||
|
|
||||||
// GetOriginalConfiguration retrieves the original configuration of the object
|
// GetOriginalConfiguration retrieves the original configuration of the object
|
||||||
// from the annotation, or nil if no annotation was found.
|
// from the annotation, or nil if no annotation was found.
|
||||||
func GetOriginalConfiguration(info *resource.Info) ([]byte, error) {
|
func GetOriginalConfiguration(mapping *meta.RESTMapping, obj runtime.Object) ([]byte, error) {
|
||||||
annots, err := info.Mapping.MetadataAccessor.Annotations(info.Object)
|
annots, err := mapping.MetadataAccessor.Annotations(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -168,7 +168,7 @@ func GetModifiedConfiguration(info *resource.Info, annotate bool, codec runtime.
|
|||||||
// UpdateApplyAnnotation calls CreateApplyAnnotation if the last applied
|
// UpdateApplyAnnotation calls CreateApplyAnnotation if the last applied
|
||||||
// configuration annotation is already present. Otherwise, it does nothing.
|
// configuration annotation is already present. Otherwise, it does nothing.
|
||||||
func UpdateApplyAnnotation(info *resource.Info, codec runtime.Encoder) error {
|
func UpdateApplyAnnotation(info *resource.Info, codec runtime.Encoder) error {
|
||||||
if original, err := GetOriginalConfiguration(info); err != nil || len(original) <= 0 {
|
if original, err := GetOriginalConfiguration(info.Mapping, info.Object); err != nil || len(original) <= 0 {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return CreateApplyAnnotation(info, codec)
|
return CreateApplyAnnotation(info, codec)
|
||||||
|
@ -19,11 +19,14 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jonboulle/clockwork"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/api/meta"
|
||||||
"k8s.io/kubernetes/pkg/kubectl"
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/resource"
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
@ -38,6 +41,15 @@ type ApplyOptions struct {
|
|||||||
Recursive bool
|
Recursive bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
|
||||||
|
maxPatchRetry = 5
|
||||||
|
// backOffPeriod is the period to back off when apply patch resutls in error.
|
||||||
|
backOffPeriod = 1 * time.Second
|
||||||
|
// how many times we can retry before back off
|
||||||
|
triesBeforeBackOff = 1
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
apply_long = `Apply a configuration to a resource by filename or stdin.
|
apply_long = `Apply a configuration to a resource by filename or stdin.
|
||||||
The resource will be created if it doesn't exist yet.
|
The resource will be created if it doesn't exist yet.
|
||||||
@ -154,43 +166,16 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serialize the current configuration of the object from the server.
|
|
||||||
current, err := runtime.Encode(encoder, info.Object)
|
|
||||||
if err != nil {
|
|
||||||
return cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", info), info.Source, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieve the original configuration of the object from the annotation.
|
|
||||||
original, err := kubectl.GetOriginalConfiguration(info)
|
|
||||||
if err != nil {
|
|
||||||
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", info), info.Source, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the versioned struct from the original from the server for
|
|
||||||
// strategic patch.
|
|
||||||
// TODO: Move all structs in apply to use raw data. Can be done once
|
|
||||||
// builder has a RawResult method which delivers raw data instead of
|
|
||||||
// internal objects.
|
|
||||||
versionedObject, _, err := decoder.Decode(current, nil, nil)
|
|
||||||
if err != nil {
|
|
||||||
return cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", info), info.Source, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compute a three way strategic merge patch to send to server.
|
|
||||||
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true)
|
|
||||||
if err != nil {
|
|
||||||
format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfrom:\n%v\nfor:"
|
|
||||||
return cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current, info), info.Source, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
helper := resource.NewHelper(info.Client, info.Mapping)
|
helper := resource.NewHelper(info.Client, info.Mapping)
|
||||||
_, err = helper.Patch(info.Namespace, info.Name, api.StrategicMergePatchType, patch)
|
patcher := NewPatcher(encoder, decoder, info.Mapping, helper)
|
||||||
|
|
||||||
|
patchBytes, err := patcher.patch(info.Object, modified, info.Source, info.Namespace, info.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patch, info), info.Source, err)
|
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmdutil.ShouldRecord(cmd, info) {
|
if cmdutil.ShouldRecord(cmd, info) {
|
||||||
patch, err = cmdutil.ChangeResourcePatch(info, f.Command())
|
patch, err := cmdutil.ChangeResourcePatch(info, f.Command())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -215,3 +200,74 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patcher struct {
|
||||||
|
encoder runtime.Encoder
|
||||||
|
decoder runtime.Decoder
|
||||||
|
|
||||||
|
mapping *meta.RESTMapping
|
||||||
|
helper *resource.Helper
|
||||||
|
|
||||||
|
backOff clockwork.Clock
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPatcher(encoder runtime.Encoder, decoder runtime.Decoder, mapping *meta.RESTMapping, helper *resource.Helper) *patcher {
|
||||||
|
return &patcher{
|
||||||
|
encoder: encoder,
|
||||||
|
decoder: decoder,
|
||||||
|
mapping: mapping,
|
||||||
|
helper: helper,
|
||||||
|
backOff: clockwork.NewRealClock(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) {
|
||||||
|
// Serialize the current configuration of the object from the server.
|
||||||
|
current, err := runtime.Encode(p.encoder, obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the original configuration of the object from the annotation.
|
||||||
|
original, err := kubectl.GetOriginalConfiguration(p.mapping, obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the versioned struct from the original from the server for
|
||||||
|
// strategic patch.
|
||||||
|
// TODO: Move all structs in apply to use raw data. Can be done once
|
||||||
|
// builder has a RawResult method which delivers raw data instead of
|
||||||
|
// internal objects.
|
||||||
|
versionedObject, _, err := p.decoder.Decode(current, nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", obj), source, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute a three way strategic merge patch to send to server.
|
||||||
|
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true)
|
||||||
|
if err != nil {
|
||||||
|
format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
|
||||||
|
return nil, cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current), source, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = p.helper.Patch(namespace, name, api.StrategicMergePatchType, patch)
|
||||||
|
return patch, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *patcher) patch(current runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) {
|
||||||
|
var getErr error
|
||||||
|
patchBytes, err := p.patchSimple(current, modified, source, namespace, name)
|
||||||
|
for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ {
|
||||||
|
if i > triesBeforeBackOff {
|
||||||
|
p.backOff.Sleep(backOffPeriod)
|
||||||
|
}
|
||||||
|
current, getErr = p.helper.Get(namespace, name, false)
|
||||||
|
if getErr != nil {
|
||||||
|
return nil, getErr
|
||||||
|
}
|
||||||
|
patchBytes, err = p.patchSimple(current, modified, source, namespace, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return patchBytes, err
|
||||||
|
}
|
||||||
|
@ -19,6 +19,7 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@ -29,7 +30,9 @@ import (
|
|||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/annotations"
|
"k8s.io/kubernetes/pkg/api/annotations"
|
||||||
|
kubeerr "k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/api/meta"
|
"k8s.io/kubernetes/pkg/api/meta"
|
||||||
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/unversioned/fake"
|
"k8s.io/kubernetes/pkg/client/unversioned/fake"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
@ -213,6 +216,61 @@ func TestApplyObject(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestApplyRetry(t *testing.T) {
|
||||||
|
initTestErrorHandler(t)
|
||||||
|
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
|
||||||
|
pathRC := "/namespaces/test/replicationcontrollers/" + nameRC
|
||||||
|
|
||||||
|
firstPatch := true
|
||||||
|
retry := false
|
||||||
|
getCount := 0
|
||||||
|
f, tf, codec := NewAPIFactory()
|
||||||
|
tf.Printer = &testPrinter{}
|
||||||
|
tf.Client = &fake.RESTClient{
|
||||||
|
Codec: codec,
|
||||||
|
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
|
||||||
|
switch p, m := req.URL.Path, req.Method; {
|
||||||
|
case p == pathRC && m == "GET":
|
||||||
|
getCount++
|
||||||
|
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil
|
||||||
|
case p == pathRC && m == "PATCH":
|
||||||
|
if firstPatch {
|
||||||
|
firstPatch = false
|
||||||
|
statusErr := kubeerr.NewConflict(unversioned.GroupResource{Group: "", Resource: "rc"}, "test-rc", fmt.Errorf("the object has been modified. Please apply at first."))
|
||||||
|
bodyBytes, _ := json.Marshal(statusErr)
|
||||||
|
bodyErr := ioutil.NopCloser(bytes.NewReader(bodyBytes))
|
||||||
|
return &http.Response{StatusCode: http.StatusConflict, Header: defaultHeader(), Body: bodyErr}, nil
|
||||||
|
}
|
||||||
|
retry = true
|
||||||
|
validatePatchApplication(t, req)
|
||||||
|
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil
|
||||||
|
default:
|
||||||
|
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
tf.Namespace = "test"
|
||||||
|
buf := bytes.NewBuffer([]byte{})
|
||||||
|
|
||||||
|
cmd := NewCmdApply(f, buf)
|
||||||
|
cmd.Flags().Set("filename", filenameRC)
|
||||||
|
cmd.Flags().Set("output", "name")
|
||||||
|
cmd.Run(cmd, []string{})
|
||||||
|
|
||||||
|
if !retry || getCount != 2 {
|
||||||
|
t.Fatalf("apply didn't retry when get conflict error")
|
||||||
|
}
|
||||||
|
|
||||||
|
// uses the name from the file, not the response
|
||||||
|
expectRC := "replicationcontroller/" + nameRC + "\n"
|
||||||
|
if buf.String() != expectRC {
|
||||||
|
t.Fatalf("unexpected output: %s\nexpected: %s", buf.String(), expectRC)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestApplyNonExistObject(t *testing.T) {
|
func TestApplyNonExistObject(t *testing.T) {
|
||||||
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
|
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
|
||||||
pathRC := "/namespaces/test/replicationcontrollers"
|
pathRC := "/namespaces/test/replicationcontrollers"
|
||||||
|
Loading…
Reference in New Issue
Block a user