Use OpenAPI to determine patch type in kubectl apply patching

Currently `kubectl apply` determines correct patch type for given
GVKs by trying to register schema and if it succeeds, it uses
strategic-merge-patch.

But OpenAPI endpoint already stores which patch types are supported
by GVKs. This PR checks OpenAPI endpoint to retrieve patch type,
if OpenAPI is enabled. If it is not enabled, patch type determination
will be done as conventional registration method.
This commit is contained in:
Arda Güçlü 2022-06-08 13:12:16 +03:00
parent 759785ea14
commit cddbb0c563
6 changed files with 123 additions and 44 deletions

View File

@ -39,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
sptest "k8s.io/apimachinery/pkg/util/strategicpatch/testing"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
@ -333,7 +334,11 @@ func readAndAnnotateUnstructured(t *testing.T, filename string) (string, []byte)
return annotateRuntimeObject(t, obj1, obj2, "Widget")
}
func validatePatchApplication(t *testing.T, req *http.Request) {
func validatePatchApplication(t *testing.T, req *http.Request, patchType types.PatchType) {
if got, wanted := req.Header.Get("Content-Type"), string(patchType); got != wanted {
t.Fatalf("unexpected content-type expected: %s but actual %s\n", wanted, got)
}
patch, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Fatal(err)
@ -624,7 +629,7 @@ func TestApplyObject(t *testing.T) {
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
case p == pathRC && m == "PATCH":
validatePatchApplication(t, req)
validatePatchApplication(t, req, types.StrategicMergePatchType)
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
default:
@ -673,7 +678,7 @@ func TestApplyPruneObjects(t *testing.T) {
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
case p == pathRC && m == "PATCH":
validatePatchApplication(t, req)
validatePatchApplication(t, req, types.StrategicMergePatchType)
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
default:
@ -739,7 +744,7 @@ func TestApplyObjectOutput(t *testing.T) {
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
case p == pathRC && m == "PATCH":
validatePatchApplication(t, req)
validatePatchApplication(t, req, types.StrategicMergePatchType)
bodyRC := ioutil.NopCloser(bytes.NewReader(postPatchData))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
default:
@ -801,7 +806,7 @@ func TestApplyRetry(t *testing.T) {
return &http.Response{StatusCode: http.StatusConflict, Header: cmdtesting.DefaultHeader(), Body: bodyErr}, nil
}
retry = true
validatePatchApplication(t, req)
validatePatchApplication(t, req, types.StrategicMergePatchType)
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
default:
@ -970,14 +975,14 @@ func testApplyMultipleObjects(t *testing.T, asList bool) {
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
case p == pathRC && m == "PATCH":
validatePatchApplication(t, req)
validatePatchApplication(t, req, types.StrategicMergePatchType)
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, nil
case p == pathSVC && m == "GET":
bodySVC := ioutil.NopCloser(bytes.NewReader(currentSVC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodySVC}, nil
case p == pathSVC && m == "PATCH":
validatePatchApplication(t, req)
validatePatchApplication(t, req, types.StrategicMergePatchType)
bodySVC := ioutil.NopCloser(bytes.NewReader(currentSVC))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodySVC}, nil
default:
@ -1131,11 +1136,7 @@ func TestUnstructuredApply(t *testing.T) {
Header: cmdtesting.DefaultHeader(),
Body: body}, nil
case p == path && m == "PATCH":
contentType := req.Header.Get("Content-Type")
if contentType != "application/merge-patch+json" {
t.Fatalf("Unexpected Content-Type: %s", contentType)
}
validatePatchApplication(t, req)
validatePatchApplication(t, req, types.MergePatchType)
verifiedPatch = true
body := ioutil.NopCloser(bytes.NewReader(curr))
@ -1202,7 +1203,6 @@ func TestUnstructuredIdempotentApply(t *testing.T) {
case p == path && m == "PATCH":
// In idempotent updates, kubectl will resolve to an empty patch and not send anything to the server
// Thus, if we reach this branch, kubectl is unnecessarily sending a patch.
patch, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Fatal(err)

View File

@ -116,36 +116,38 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, namespace, na
return nil, nil, errors.Wrapf(err, "retrieving original configuration from:\n%v\nfor:", obj)
}
var patchType types.PatchType
var patch []byte
patchType := types.StrategicMergePatchType
_, err = scheme.Scheme.New(p.Mapping.GroupVersionKind)
if err != nil {
if !runtime.IsNotRegisteredError(err) {
return nil, nil, errors.Wrapf(err, "getting instance of versioned object for %v:", p.Mapping.GroupVersionKind)
}
patchType = types.MergePatchType
}
switch patchType {
case types.MergePatchType:
patch, err = p.buildMergePatch(original, modified, current)
if err != nil {
return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
}
case types.StrategicMergePatchType:
// Try to use openapi first if the openapi spec is available and can successfully calculate the patch.
// Otherwise, fall back to baked-in types.
if s := p.findOpenAPIResource(p.Mapping.GroupVersionKind); s != nil {
patch, err = p.buildStrategicMergeFromOpenAPI(s, original, modified, current)
if err != nil {
// Warn user about problem and continue strategic merge patching using builtin types.
fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err)
if p.OpenapiSchema != nil {
// if openapischema is used, we'll try to get required patch type for this GVK from Open API.
// if it fails or could not find any patch type, fall back to baked-in patch type determination.
if patchType, err = p.getPatchTypeFromOpenAPI(p.Mapping.GroupVersionKind); err == nil && patchType == types.StrategicMergePatchType {
if s := p.findOpenAPIResource(p.Mapping.GroupVersionKind); s != nil {
patch, err = p.buildStrategicMergeFromOpenAPI(s, original, modified, current)
if err != nil {
// Warn user about problem and continue strategic merge patching using builtin types.
fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err)
}
}
}
}
if patch == nil {
patch, err = p.buildStrategicMergeFromBuiltins(p.Mapping.GroupVersionKind, original, modified, current)
if patch == nil {
versionedObj, err := scheme.Scheme.New(p.Mapping.GroupVersionKind)
if err == nil {
patchType = types.StrategicMergePatchType
patch, err = p.buildStrategicMergeFromBuiltins(versionedObj, original, modified, current)
if err != nil {
return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
}
} else {
if !runtime.IsNotRegisteredError(err) {
return nil, nil, errors.Wrapf(err, "getting instance of versioned object for %v:", p.Mapping.GroupVersionKind)
}
patchType = types.MergePatchType
patch, err = p.buildMergePatch(original, modified, current)
if err != nil {
return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
}
@ -202,15 +204,26 @@ func (p *Patcher) findOpenAPIResource(gvk schema.GroupVersionKind) proto.Schema
return p.OpenapiSchema.LookupResource(gvk)
}
// getPatchTypeFromOpenAPI looks up patch types supported by given GroupVersionKind in Open API.
func (p *Patcher) getPatchTypeFromOpenAPI(gvk schema.GroupVersionKind) (types.PatchType, error) {
if pc := p.OpenapiSchema.GetConsumes(p.Mapping.GroupVersionKind, "PATCH"); pc != nil {
for _, c := range pc {
if c == string(types.StrategicMergePatchType) {
return types.StrategicMergePatchType, nil
}
}
return types.MergePatchType, nil
}
return types.MergePatchType, fmt.Errorf("unable to find any patch type for %s in Open API", gvk)
}
// buildStrategicMergeFromStruct builds patch from struct. This is used when
// openapi endpoint is not working or user disables it by setting openapi-patch flag
// to false.
func (p *Patcher) buildStrategicMergeFromBuiltins(gvk schema.GroupVersionKind, original, modified, current []byte) ([]byte, error) {
versionedObject, err := scheme.Scheme.New(gvk)
if err != nil {
return nil, err
}
lookupPatchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObject)
func (p *Patcher) buildStrategicMergeFromBuiltins(versionedObj runtime.Object, original, modified, current []byte) ([]byte, error) {
lookupPatchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObj)
if err != nil {
return nil, err
}

View File

@ -200,6 +200,10 @@ func (f FakeResources) LookupResource(s schema.GroupVersionKind) proto.Schema {
return f.resources[s]
}
func (f FakeResources) GetConsumes(gvk schema.GroupVersionKind, operation string) []string {
return nil
}
var _ openapi.Resources = &FakeResources{}
func testOpenAPISchemaData() (openapi.Resources, error) {

View File

@ -21,12 +21,14 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/yaml"
)
// Resources interface describe a resources provider, that can give you
// resource based on group-version-kind.
type Resources interface {
LookupResource(gvk schema.GroupVersionKind) proto.Schema
GetConsumes(gvk schema.GroupVersionKind, operation string) []string
}
// groupVersionKindExtensionKey is the key used to lookup the
@ -40,6 +42,7 @@ type document struct {
// Maps gvk to model name
resources map[schema.GroupVersionKind]string
models proto.Models
doc *openapi_v2.Document
}
var _ Resources = &document{}
@ -68,6 +71,7 @@ func NewOpenAPIData(doc *openapi_v2.Document) (Resources, error) {
return &document{
resources: resources,
models: models,
doc: doc,
}, nil
}
@ -79,6 +83,44 @@ func (d *document) LookupResource(gvk schema.GroupVersionKind) proto.Schema {
return d.models.LookupModel(modelName)
}
func (d *document) GetConsumes(gvk schema.GroupVersionKind, operation string) []string {
for _, path := range d.doc.GetPaths().GetPath() {
for _, ex := range path.GetValue().GetPatch().GetVendorExtension() {
if ex.GetValue().GetYaml() == "" ||
ex.GetName() != "x-kubernetes-group-version-kind" {
continue
}
var value map[string]string
err := yaml.Unmarshal([]byte(ex.GetValue().GetYaml()), &value)
if err != nil {
continue
}
if value["group"] == gvk.Group && value["kind"] == gvk.Kind && value["version"] == gvk.Version {
switch operation {
case "GET":
return path.GetValue().GetGet().GetConsumes()
case "PATCH":
return path.GetValue().GetPatch().GetConsumes()
case "HEAD":
return path.GetValue().GetHead().GetConsumes()
case "PUT":
return path.GetValue().GetPut().GetConsumes()
case "POST":
return path.GetValue().GetPost().GetConsumes()
case "OPTIONS":
return path.GetValue().GetOptions().GetConsumes()
case "DELETE":
return path.GetValue().GetDelete().GetConsumes()
}
}
}
}
return nil
}
// Get and parse GroupVersionKind from the extension. Returns empty if it doesn't have one.
func parseGroupVersionKind(s proto.Schema) []schema.GroupVersionKind {
extensions := s.GetExtensions()

View File

@ -50,6 +50,9 @@ var _ = Describe("Reading apps/v1/Deployment from openAPIData", func() {
schema = resources.LookupResource(gvk)
Expect(schema).ToNot(BeNil())
Expect(schema.(*proto.Kind)).ToNot(BeNil())
consumes := resources.GetConsumes(gvk, "PATCH")
Expect(consumes).ToNot(BeNil())
Expect(len(consumes)).To(Equal(4))
})
})

View File

@ -53,6 +53,19 @@ func (f *FakeResources) LookupResource(gvk schema.GroupVersionKind) proto.Schema
return resources.LookupResource(gvk)
}
func (f *FakeResources) GetConsumes(gvk schema.GroupVersionKind, operation string) []string {
s, err := f.fake.OpenAPISchema()
if err != nil {
panic(err)
}
resources, err := openapi.NewOpenAPIData(s)
if err != nil {
panic(err)
}
return resources.GetConsumes(gvk, operation)
}
// EmptyResources implement a Resources that just doesn't have any resources.
type EmptyResources struct{}
@ -63,6 +76,10 @@ func (f EmptyResources) LookupResource(gvk schema.GroupVersionKind) proto.Schema
return nil
}
func (f EmptyResources) GetConsumes(gvk schema.GroupVersionKind, operation string) []string {
return nil
}
// CreateOpenAPISchemaFunc returns a function useful for the TestFactory.
func CreateOpenAPISchemaFunc(path string) func() (openapi.Resources, error) {
return func() (openapi.Resources, error) {