Support application/apply-patch+cbor in patch requests.

This commit is contained in:
Ben Luddy 2024-10-22 16:08:24 -04:00
parent f01e0d64db
commit 37ed906a33
No known key found for this signature in database
GPG Key ID: A6551E73A5974C30
18 changed files with 481 additions and 156 deletions

View File

@ -325,7 +325,10 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
supportedTypes := []string{ supportedTypes := []string{
string(types.JSONPatchType), string(types.JSONPatchType),
string(types.MergePatchType), string(types.MergePatchType),
string(types.ApplyPatchType), string(types.ApplyYAMLPatchType),
}
if utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) {
supportedTypes = append(supportedTypes, string(types.ApplyCBORPatchType))
} }
var handlerFunc http.HandlerFunc var handlerFunc http.HandlerFunc

View File

@ -186,15 +186,16 @@ func ValidateUpdateOptions(options *metav1.UpdateOptions) field.ErrorList {
func ValidatePatchOptions(options *metav1.PatchOptions, patchType types.PatchType) field.ErrorList { func ValidatePatchOptions(options *metav1.PatchOptions, patchType types.PatchType) field.ErrorList {
allErrs := field.ErrorList{} allErrs := field.ErrorList{}
if patchType != types.ApplyPatchType { switch patchType {
if options.Force != nil { case types.ApplyYAMLPatchType, types.ApplyCBORPatchType:
allErrs = append(allErrs, field.Forbidden(field.NewPath("force"), "may not be specified for non-apply patch"))
}
} else {
if options.FieldManager == "" { if options.FieldManager == "" {
// This field is defaulted to "kubectl" by kubectl, but HAS TO be explicitly set by controllers. // This field is defaulted to "kubectl" by kubectl, but HAS TO be explicitly set by controllers.
allErrs = append(allErrs, field.Required(field.NewPath("fieldManager"), "is required for apply patch")) allErrs = append(allErrs, field.Required(field.NewPath("fieldManager"), "is required for apply patch"))
} }
default:
if options.Force != nil {
allErrs = append(allErrs, field.Forbidden(field.NewPath("force"), "may not be specified for non-apply patch"))
}
} }
allErrs = append(allErrs, ValidateFieldManager(options.FieldManager, field.NewPath("fieldManager"))...) allErrs = append(allErrs, ValidateFieldManager(options.FieldManager, field.NewPath("fieldManager"))...)
allErrs = append(allErrs, ValidateDryRun(field.NewPath("dryRun"), options.DryRun)...) allErrs = append(allErrs, ValidateDryRun(field.NewPath("dryRun"), options.DryRun)...)

View File

@ -141,12 +141,23 @@ func TestValidPatchOptions(t *testing.T) {
Force: boolPtr(true), Force: boolPtr(true),
FieldManager: "kubectl", FieldManager: "kubectl",
}, },
patchType: types.ApplyPatchType, patchType: types.ApplyYAMLPatchType,
}, { }, {
opts: metav1.PatchOptions{ opts: metav1.PatchOptions{
FieldManager: "kubectl", FieldManager: "kubectl",
}, },
patchType: types.ApplyPatchType, patchType: types.ApplyYAMLPatchType,
}, {
opts: metav1.PatchOptions{
Force: boolPtr(true),
FieldManager: "kubectl",
},
patchType: types.ApplyCBORPatchType,
}, {
opts: metav1.PatchOptions{
FieldManager: "kubectl",
},
patchType: types.ApplyCBORPatchType,
}, { }, {
opts: metav1.PatchOptions{}, opts: metav1.PatchOptions{},
patchType: types.MergePatchType, patchType: types.MergePatchType,
@ -175,7 +186,12 @@ func TestInvalidPatchOptions(t *testing.T) {
// missing manager // missing manager
{ {
opts: metav1.PatchOptions{}, opts: metav1.PatchOptions{},
patchType: types.ApplyPatchType, patchType: types.ApplyYAMLPatchType,
},
// missing manager
{
opts: metav1.PatchOptions{},
patchType: types.ApplyCBORPatchType,
}, },
// force on non-apply // force on non-apply
{ {

View File

@ -25,5 +25,7 @@ const (
JSONPatchType PatchType = "application/json-patch+json" JSONPatchType PatchType = "application/json-patch+json"
MergePatchType PatchType = "application/merge-patch+json" MergePatchType PatchType = "application/merge-patch+json"
StrategicMergePatchType PatchType = "application/strategic-merge-patch+json" StrategicMergePatchType PatchType = "application/strategic-merge-patch+json"
ApplyPatchType PatchType = "application/apply-patch+yaml" ApplyPatchType PatchType = ApplyYAMLPatchType
ApplyYAMLPatchType PatchType = "application/apply-patch+yaml"
ApplyCBORPatchType PatchType = "application/apply-patch+cbor"
) )

View File

@ -35,9 +35,11 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/validation" "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
cbor "k8s.io/apimachinery/pkg/runtime/serializer/cbor/direct"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/managedfields" "k8s.io/apimachinery/pkg/util/managedfields"
"k8s.io/apimachinery/pkg/util/mergepatch" "k8s.io/apimachinery/pkg/util/mergepatch"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
@ -50,8 +52,10 @@ import (
requestmetrics "k8s.io/apiserver/pkg/endpoints/handlers/metrics" requestmetrics "k8s.io/apiserver/pkg/endpoints/handlers/metrics"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun" "k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing" "k8s.io/component-base/tracing"
) )
@ -129,10 +133,25 @@ func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interfac
audit.LogRequestPatch(req.Context(), patchBytes) audit.LogRequestPatch(req.Context(), patchBytes)
span.AddEvent("Recorded the audit event") span.AddEvent("Recorded the audit event")
baseContentType := runtime.ContentTypeJSON var baseContentType string
if patchType == types.ApplyPatchType { switch patchType {
case types.ApplyYAMLPatchType:
baseContentType = runtime.ContentTypeYAML baseContentType = runtime.ContentTypeYAML
case types.ApplyCBORPatchType:
if !utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) {
// This request should have already been rejected by the
// Content-Type allowlist check. Return 500 because assumptions are
// already broken and the feature is not GA.
utilruntime.HandleErrorWithContext(req.Context(), nil, "The patch content-type allowlist check should have made this unreachable.")
scope.err(errors.NewInternalError(errors.NewInternalError(fmt.Errorf("unexpected patch type: %v", patchType))), w, req)
return
}
baseContentType = runtime.ContentTypeCBOR
default:
baseContentType = runtime.ContentTypeJSON
} }
s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), baseContentType) s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), baseContentType)
if !ok { if !ok {
scope.err(fmt.Errorf("no serializer defined for %v", baseContentType), w, req) scope.err(fmt.Errorf("no serializer defined for %v", baseContentType), w, req)
@ -452,6 +471,20 @@ func (p *smpPatcher) createNewObject(_ context.Context) (runtime.Object, error)
return nil, errors.NewNotFound(p.resource.GroupResource(), p.name) return nil, errors.NewNotFound(p.resource.GroupResource(), p.name)
} }
func newApplyPatcher(p *patcher, fieldManager *managedfields.FieldManager, unmarshalFn, unmarshalStrictFn func([]byte, interface{}) error) *applyPatcher {
return &applyPatcher{
fieldManager: fieldManager,
patch: p.patchBytes,
options: p.options,
creater: p.creater,
kind: p.kind,
userAgent: p.userAgent,
validationDirective: p.validationDirective,
unmarshalFn: unmarshalFn,
unmarshalStrictFn: unmarshalStrictFn,
}
}
type applyPatcher struct { type applyPatcher struct {
patch []byte patch []byte
options *metav1.PatchOptions options *metav1.PatchOptions
@ -460,6 +493,8 @@ type applyPatcher struct {
fieldManager *managedfields.FieldManager fieldManager *managedfields.FieldManager
userAgent string userAgent string
validationDirective string validationDirective string
unmarshalFn func(data []byte, v interface{}) error
unmarshalStrictFn func(data []byte, v interface{}) error
} }
func (p *applyPatcher) applyPatchToCurrentObject(requestContext context.Context, obj runtime.Object) (runtime.Object, error) { func (p *applyPatcher) applyPatchToCurrentObject(requestContext context.Context, obj runtime.Object) (runtime.Object, error) {
@ -472,7 +507,7 @@ func (p *applyPatcher) applyPatchToCurrentObject(requestContext context.Context,
} }
patchObj := &unstructured.Unstructured{Object: map[string]interface{}{}} patchObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(p.patch, &patchObj.Object); err != nil { if err := p.unmarshalFn(p.patch, &patchObj.Object); err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error decoding YAML: %v", err)) return nil, errors.NewBadRequest(fmt.Sprintf("error decoding YAML: %v", err))
} }
@ -484,7 +519,7 @@ func (p *applyPatcher) applyPatchToCurrentObject(requestContext context.Context,
// TODO: spawn something to track deciding whether a fieldValidation=Strict // TODO: spawn something to track deciding whether a fieldValidation=Strict
// fatal error should return before an error from the apply operation // fatal error should return before an error from the apply operation
if p.validationDirective == metav1.FieldValidationStrict || p.validationDirective == metav1.FieldValidationWarn { if p.validationDirective == metav1.FieldValidationStrict || p.validationDirective == metav1.FieldValidationWarn {
if err := yaml.UnmarshalStrict(p.patch, &map[string]interface{}{}); err != nil { if err := p.unmarshalStrictFn(p.patch, &map[string]interface{}{}); err != nil {
if p.validationDirective == metav1.FieldValidationStrict { if p.validationDirective == metav1.FieldValidationStrict {
return nil, errors.NewBadRequest(fmt.Sprintf("error strict decoding YAML: %v", err)) return nil, errors.NewBadRequest(fmt.Sprintf("error strict decoding YAML: %v", err))
} }
@ -634,16 +669,21 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
fieldManager: scope.FieldManager, fieldManager: scope.FieldManager,
} }
// this case is unreachable if ServerSideApply is not enabled because we will have already rejected the content type // this case is unreachable if ServerSideApply is not enabled because we will have already rejected the content type
case types.ApplyPatchType: case types.ApplyYAMLPatchType:
p.mechanism = &applyPatcher{ p.mechanism = newApplyPatcher(p, scope.FieldManager, yaml.Unmarshal, yaml.UnmarshalStrict)
fieldManager: scope.FieldManager, p.forceAllowCreate = true
patch: p.patchBytes, case types.ApplyCBORPatchType:
options: p.options, if !utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) {
creater: p.creater, utilruntime.HandleErrorWithContext(context.TODO(), nil, "CBOR apply requests should be rejected before reaching this point unless the feature gate is enabled.")
kind: p.kind, return nil, false, fmt.Errorf("%v: unimplemented patch type", p.patchType)
userAgent: p.userAgent,
validationDirective: p.validationDirective,
} }
// The strict and non-strict funcs are the same here because any CBOR map with
// duplicate keys is invalid and always rejected outright regardless of strictness
// mode, and unknown field errors can't occur in practice because the type of the
// destination value for unmarshaling an apply configuration is always
// "unstructured".
p.mechanism = newApplyPatcher(p, scope.FieldManager, cbor.Unmarshal, cbor.Unmarshal)
p.forceAllowCreate = true p.forceAllowCreate = true
default: default:
return nil, false, fmt.Errorf("%v: unimplemented patch type", p.patchType) return nil, false, fmt.Errorf("%v: unimplemented patch type", p.patchType)
@ -670,7 +710,7 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
result, err := requestFunc() result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large, // If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again. // it is safe to remove managedFields (which can be large) and try again.
if isTooLargeError(err) && p.patchType != types.ApplyPatchType { if isTooLargeError(err) && p.patchType != types.ApplyYAMLPatchType && p.patchType != types.ApplyCBORPatchType {
if _, accessorErr := meta.Accessor(p.restPatcher.New()); accessorErr == nil { if _, accessorErr := meta.Accessor(p.restPatcher.New()); accessorErr == nil {
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil,
p.applyPatch, p.applyPatch,

View File

@ -875,7 +875,10 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
string(types.JSONPatchType), string(types.JSONPatchType),
string(types.MergePatchType), string(types.MergePatchType),
string(types.StrategicMergePatchType), string(types.StrategicMergePatchType),
string(types.ApplyPatchType), string(types.ApplyYAMLPatchType),
}
if utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) {
supportedTypes = append(supportedTypes, string(types.ApplyCBORPatchType))
} }
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulPatchResource(patcher, reqScope, admit, supportedTypes)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulPatchResource(patcher, reqScope, admit, supportedTypes))
handler = utilwarning.AddWarningsHandler(handler, warnings) handler = utilwarning.AddWarningsHandler(handler, warnings)

View File

@ -207,7 +207,7 @@ func TestGoldenRequest(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("failed to load fixture: %v", err) t.Fatalf("failed to load fixture: %v", err)
} }
if diff := cmp.Diff(got, want); diff != "" { if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("unexpected difference from expected bytes:\n%s", diff) t.Errorf("unexpected difference from expected bytes:\n%s", diff)
} }
})) }))

View File

@ -25,12 +25,12 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/features" "k8s.io/client-go/features"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/util/apply"
"k8s.io/client-go/util/consistencydetector" "k8s.io/client-go/util/consistencydetector"
"k8s.io/client-go/util/watchlist" "k8s.io/client-go/util/watchlist"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -340,10 +340,6 @@ func (c *dynamicResourceClient) Apply(ctx context.Context, name string, obj *uns
if err := validateNamespaceWithOptionalName(c.namespace, name); err != nil { if err := validateNamespaceWithOptionalName(c.namespace, name); err != nil {
return nil, err return nil, err
} }
outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
accessor, err := meta.Accessor(obj) accessor, err := meta.Accessor(obj)
if err != nil { if err != nil {
return nil, err return nil, err
@ -355,21 +351,16 @@ func (c *dynamicResourceClient) Apply(ctx context.Context, name string, obj *uns
} }
patchOpts := opts.ToPatchOptions() patchOpts := opts.ToPatchOptions()
result := c.client.client. request, err := apply.NewRequest(c.client.client, obj.Object)
Patch(types.ApplyPatchType).
AbsPath(append(c.makeURLSegments(name), subresources...)...).
Body(outBytes).
SpecificallyVersionedParams(&patchOpts, dynamicParameterCodec, versionV1).
Do(ctx)
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var out unstructured.Unstructured var out unstructured.Unstructured
if err := runtime.DecodeInto(unstructured.UnstructuredJSONScheme, retBytes, &out); err != nil { if err := request.
AbsPath(append(c.makeURLSegments(name), subresources...)...).
SpecificallyVersionedParams(&patchOpts, dynamicParameterCodec, versionV1).
Do(ctx).Into(&out); err != nil {
return nil, err return nil, err
} }
return &out, nil return &out, nil

View File

@ -2,8 +2,8 @@ PATCH /apis/flops/v1alpha1/namespaces/mops/flips/mips/fin?force=true HTTP/1.1
Host: example.com Host: example.com
Accept: application/json Accept: application/json
Accept-Encoding: gzip Accept-Encoding: gzip
Content-Length: 29 Content-Length: 28
Content-Type: application/apply-patch+yaml Content-Type: application/apply-patch+yaml
User-Agent: TestGoldenRequest User-Agent: TestGoldenRequest
{"metadata":{"name":"mips"}} {"metadata":{"name":"mips"}}

View File

@ -2,8 +2,8 @@ PATCH /apis/flops/v1alpha1/namespaces/mops/flips/mips/status?force=true HTTP/1.1
Host: example.com Host: example.com
Accept: application/json Accept: application/json
Accept-Encoding: gzip Accept-Encoding: gzip
Content-Length: 29 Content-Length: 28
Content-Type: application/apply-patch+yaml Content-Type: application/apply-patch+yaml
User-Agent: TestGoldenRequest User-Agent: TestGoldenRequest
{"metadata":{"name":"mips"}} {"metadata":{"name":"mips"}}

View File

@ -18,7 +18,6 @@ package gentype
import ( import (
"context" "context"
json "encoding/json"
"fmt" "fmt"
"time" "time"
@ -27,6 +26,7 @@ import (
types "k8s.io/apimachinery/pkg/types" types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch" watch "k8s.io/apimachinery/pkg/watch"
rest "k8s.io/client-go/rest" rest "k8s.io/client-go/rest"
"k8s.io/client-go/util/apply"
"k8s.io/client-go/util/consistencydetector" "k8s.io/client-go/util/consistencydetector"
"k8s.io/client-go/util/watchlist" "k8s.io/client-go/util/watchlist"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -337,20 +337,21 @@ func (a *alsoApplier[T, C]) Apply(ctx context.Context, obj C, opts metav1.ApplyO
return *new(T), fmt.Errorf("object provided to Apply must not be nil") return *new(T), fmt.Errorf("object provided to Apply must not be nil")
} }
patchOpts := opts.ToPatchOptions() patchOpts := opts.ToPatchOptions()
data, err := json.Marshal(obj)
if err != nil {
return *new(T), err
}
if obj.GetName() == nil { if obj.GetName() == nil {
return *new(T), fmt.Errorf("obj.Name must be provided to Apply") return *new(T), fmt.Errorf("obj.Name must be provided to Apply")
} }
err = a.client.client.Patch(types.ApplyPatchType).
request, err := apply.NewRequest(a.client.client, obj)
if err != nil {
return *new(T), err
}
err = request.
UseProtobufAsDefaultIfPreferred(a.client.prefersProtobuf). UseProtobufAsDefaultIfPreferred(a.client.prefersProtobuf).
NamespaceIfScoped(a.client.namespace, a.client.namespace != ""). NamespaceIfScoped(a.client.namespace, a.client.namespace != "").
Resource(a.client.resource). Resource(a.client.resource).
Name(*obj.GetName()). Name(*obj.GetName()).
VersionedParams(&patchOpts, a.client.parameterCodec). VersionedParams(&patchOpts, a.client.parameterCodec).
Body(data).
Do(ctx). Do(ctx).
Into(result) Into(result)
return result, err return result, err
@ -362,24 +363,24 @@ func (a *alsoApplier[T, C]) ApplyStatus(ctx context.Context, obj C, opts metav1.
return *new(T), fmt.Errorf("object provided to Apply must not be nil") return *new(T), fmt.Errorf("object provided to Apply must not be nil")
} }
patchOpts := opts.ToPatchOptions() patchOpts := opts.ToPatchOptions()
data, err := json.Marshal(obj)
if err != nil {
return *new(T), err
}
if obj.GetName() == nil { if obj.GetName() == nil {
return *new(T), fmt.Errorf("obj.Name must be provided to Apply") return *new(T), fmt.Errorf("obj.Name must be provided to Apply")
} }
request, err := apply.NewRequest(a.client.client, obj)
if err != nil {
return *new(T), err
}
result := a.client.newObject() result := a.client.newObject()
err = a.client.client.Patch(types.ApplyPatchType). err = request.
UseProtobufAsDefaultIfPreferred(a.client.prefersProtobuf). UseProtobufAsDefaultIfPreferred(a.client.prefersProtobuf).
NamespaceIfScoped(a.client.namespace, a.client.namespace != ""). NamespaceIfScoped(a.client.namespace, a.client.namespace != "").
Resource(a.client.resource). Resource(a.client.resource).
Name(*obj.GetName()). Name(*obj.GetName()).
SubResource("status"). SubResource("status").
VersionedParams(&patchOpts, a.client.parameterCodec). VersionedParams(&patchOpts, a.client.parameterCodec).
Body(data).
Do(ctx). Do(ctx).
Into(result) Into(result)
return result, err return result, err

View File

@ -0,0 +1,49 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"fmt"
cbor "k8s.io/apimachinery/pkg/runtime/serializer/cbor/direct"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/client-go/features"
"k8s.io/client-go/rest"
)
// NewRequest builds a new server-side apply request. The provided apply configuration object will
// be marshalled to the request's body using the default encoding, and the Content-Type header will
// be set to application/apply-patch with the appropriate structured syntax name suffix (today,
// either +yaml or +cbor, see
// https://www.iana.org/assignments/media-type-structured-suffix/media-type-structured-suffix.xhtml).
func NewRequest(client rest.Interface, applyConfiguration interface{}) (*rest.Request, error) {
pt := types.ApplyYAMLPatchType
marshal := json.Marshal
if features.TestOnlyFeatureGates.Enabled(features.TestOnlyClientAllowsCBOR) && features.TestOnlyFeatureGates.Enabled(features.TestOnlyClientPrefersCBOR) {
pt = types.ApplyCBORPatchType
marshal = cbor.Marshal
}
body, err := marshal(applyConfiguration)
if err != nil {
return nil, fmt.Errorf("failed to marshal apply configuration: %w", err)
}
return client.Patch(pt).Body(body), nil
}

View File

@ -142,7 +142,6 @@ func (g *genClientForType) GenerateType(c *generator.Context, t *types.Type, w i
"ApplyOptions": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/apis/meta/v1", Name: "ApplyOptions"}), "ApplyOptions": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/apis/meta/v1", Name: "ApplyOptions"}),
"PatchType": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/types", Name: "PatchType"}), "PatchType": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/types", Name: "PatchType"}),
"PatchOptions": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/apis/meta/v1", Name: "PatchOptions"}), "PatchOptions": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/apis/meta/v1", Name: "PatchOptions"}),
"jsonMarshal": c.Universe.Function(types.Name{Package: "encoding/json", Name: "Marshal"}),
"context": c.Universe.Type(types.Name{Package: "context", Name: "Context"}), "context": c.Universe.Type(types.Name{Package: "context", Name: "Context"}),
}, },
} }
@ -172,11 +171,9 @@ func (g *genClientForType) GenerateType(c *generator.Context, t *types.Type, w i
"ApplyOptions": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/apis/meta/v1", Name: "ApplyOptions"}), "ApplyOptions": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/apis/meta/v1", Name: "ApplyOptions"}),
"UpdateOptions": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/apis/meta/v1", Name: "UpdateOptions"}), "UpdateOptions": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/apis/meta/v1", Name: "UpdateOptions"}),
"PatchType": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/types", Name: "PatchType"}), "PatchType": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/types", Name: "PatchType"}),
"ApplyPatchType": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/types", Name: "ApplyPatchType"}),
"watchInterface": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/watch", Name: "Interface"}), "watchInterface": c.Universe.Type(types.Name{Package: "k8s.io/apimachinery/pkg/watch", Name: "Interface"}),
"RESTClientInterface": c.Universe.Type(types.Name{Package: "k8s.io/client-go/rest", Name: "Interface"}), "RESTClientInterface": c.Universe.Type(types.Name{Package: "k8s.io/client-go/rest", Name: "Interface"}),
"schemeParameterCodec": c.Universe.Variable(types.Name{Package: path.Join(g.clientsetPackage, "scheme"), Name: "ParameterCodec"}), "schemeParameterCodec": c.Universe.Variable(types.Name{Package: path.Join(g.clientsetPackage, "scheme"), Name: "ParameterCodec"}),
"jsonMarshal": c.Universe.Function(types.Name{Package: "encoding/json", Name: "Marshal"}),
"fmtErrorf": c.Universe.Function(types.Name{Package: "fmt", Name: "Errorf"}), "fmtErrorf": c.Universe.Function(types.Name{Package: "fmt", Name: "Errorf"}),
"klogWarningf": c.Universe.Function(types.Name{Package: "k8s.io/klog/v2", Name: "Warningf"}), "klogWarningf": c.Universe.Function(types.Name{Package: "k8s.io/klog/v2", Name: "Warningf"}),
"context": c.Universe.Type(types.Name{Package: "context", Name: "Context"}), "context": c.Universe.Type(types.Name{Package: "context", Name: "Context"}),
@ -186,6 +183,7 @@ func (g *genClientForType) GenerateType(c *generator.Context, t *types.Type, w i
"CheckListFromCacheDataConsistencyIfRequested": c.Universe.Function(types.Name{Package: "k8s.io/client-go/util/consistencydetector", Name: "CheckListFromCacheDataConsistencyIfRequested"}), "CheckListFromCacheDataConsistencyIfRequested": c.Universe.Function(types.Name{Package: "k8s.io/client-go/util/consistencydetector", Name: "CheckListFromCacheDataConsistencyIfRequested"}),
"CheckWatchListFromCacheDataConsistencyIfRequested": c.Universe.Function(types.Name{Package: "k8s.io/client-go/util/consistencydetector", Name: "CheckWatchListFromCacheDataConsistencyIfRequested"}), "CheckWatchListFromCacheDataConsistencyIfRequested": c.Universe.Function(types.Name{Package: "k8s.io/client-go/util/consistencydetector", Name: "CheckWatchListFromCacheDataConsistencyIfRequested"}),
"PrepareWatchListOptionsFromListOptions": c.Universe.Function(types.Name{Package: "k8s.io/client-go/util/watchlist", Name: "PrepareWatchListOptionsFromListOptions"}), "PrepareWatchListOptionsFromListOptions": c.Universe.Function(types.Name{Package: "k8s.io/client-go/util/watchlist", Name: "PrepareWatchListOptionsFromListOptions"}),
"applyNewRequest": c.Universe.Function(types.Name{Package: "k8s.io/client-go/util/apply", Name: "NewRequest"}),
"Client": c.Universe.Type(types.Name{Package: "k8s.io/client-go/gentype", Name: "Client"}), "Client": c.Universe.Type(types.Name{Package: "k8s.io/client-go/gentype", Name: "Client"}),
"ClientWithList": c.Universe.Type(types.Name{Package: "k8s.io/client-go/gentype", Name: "ClientWithList"}), "ClientWithList": c.Universe.Type(types.Name{Package: "k8s.io/client-go/gentype", Name: "ClientWithList"}),
"ClientWithApply": c.Universe.Type(types.Name{Package: "k8s.io/client-go/gentype", Name: "ClientWithApply"}), "ClientWithApply": c.Universe.Type(types.Name{Package: "k8s.io/client-go/gentype", Name: "ClientWithApply"}),
@ -843,22 +841,21 @@ func (c *$.type|privatePlural$) $.verb$(ctx $.context|raw$, $.inputType|private$
return nil, $.fmtErrorf|raw$("$.inputType|private$ provided to $.verb$ must not be nil") return nil, $.fmtErrorf|raw$("$.inputType|private$ provided to $.verb$ must not be nil")
} }
patchOpts := opts.ToPatchOptions() patchOpts := opts.ToPatchOptions()
data, err := $.jsonMarshal|raw$($.inputType|private$) name := $.inputType|private$.Name
if err != nil {
return nil, err
}
name := $.inputType|private$.Name
if name == nil { if name == nil {
return nil, $.fmtErrorf|raw$("$.inputType|private$.Name must be provided to $.verb$") return nil, $.fmtErrorf|raw$("$.inputType|private$.Name must be provided to $.verb$")
} }
request, err := $.applyNewRequest|raw$(c.GetClient(), $.inputType|private$)
if err != nil {
return nil, err
}
result = &$.resultType|raw${} result = &$.resultType|raw${}
err = c.GetClient().Patch($.ApplyPatchType|raw$). err = request.
$if .prefersProtobuf$UseProtobufAsDefault().$end$ $if .prefersProtobuf$UseProtobufAsDefault().$end$
$if .namespaced$Namespace(c.GetNamespace()).$end$ $if .namespaced$Namespace(c.GetNamespace()).$end$
Resource("$.type|resource$"). Resource("$.type|resource$").
Name(*name). Name(*name).
VersionedParams(&patchOpts, $.schemeParameterCodec|raw$). VersionedParams(&patchOpts, $.schemeParameterCodec|raw$).
Body(data).
Do(ctx). Do(ctx).
Into(result) Into(result)
return return
@ -873,20 +870,19 @@ func (c *$.type|privatePlural$) $.verb$(ctx $.context|raw$, $.type|private$Name
return nil, $.fmtErrorf|raw$("$.inputType|private$ provided to $.verb$ must not be nil") return nil, $.fmtErrorf|raw$("$.inputType|private$ provided to $.verb$ must not be nil")
} }
patchOpts := opts.ToPatchOptions() patchOpts := opts.ToPatchOptions()
data, err := $.jsonMarshal|raw$($.inputType|private$) request, err := $.applyNewRequest|raw$(c.GetClient(), $.inputType|private$)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result = &$.resultType|raw${} result = &$.resultType|raw${}
err = c.GetClient().Patch($.ApplyPatchType|raw$). err = request.
$if .prefersProtobuf$UseProtobufAsDefault().$end$ $if .prefersProtobuf$UseProtobufAsDefault().$end$
$if .namespaced$Namespace(c.GetNamespace()).$end$ $if .namespaced$Namespace(c.GetNamespace()).$end$
Resource("$.type|resource$"). Resource("$.type|resource$").
Name($.type|private$Name). Name($.type|private$Name).
SubResource("$.subresourcePath$"). SubResource("$.subresourcePath$").
VersionedParams(&patchOpts, $.schemeParameterCodec|raw$). VersionedParams(&patchOpts, $.schemeParameterCodec|raw$).
Body(data).
Do(ctx). Do(ctx).
Into(result) Into(result)
return return

View File

@ -1033,7 +1033,7 @@ func TestPatchVeryLargeObject(t *testing.T) {
} }
// Applying to the same object should cause managedFields to go over the object size limit, and fail. // Applying to the same object should cause managedFields to go over the object size limit, and fail.
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType). _, err = client.CoreV1().RESTClient().Patch(types.ApplyYAMLPatchType).
Namespace("default"). Namespace("default").
Resource("configmaps"). Resource("configmaps").
Name("large-patch-test-cm"). Name("large-patch-test-cm").
@ -1053,6 +1053,79 @@ func TestPatchVeryLargeObject(t *testing.T) {
} }
} }
// TestPatchVeryLargeObjectCBORApply mirrors TestPatchVeryLargeObject using the +cbor structured
// syntax suffix for application/apply-patch and with CBOR enabled.
func TestPatchVeryLargeObjectCBORApply(t *testing.T) {
framework.EnableCBORServingAndStorageForTest(t)
framework.SetTestOnlyCBORClientFeatureGatesForTest(t, true, false)
client, closeFn := setup(t)
defer closeFn()
cfg := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "large-patch-test-cm",
Namespace: "default",
},
Data: map[string]string{"k": "v"},
}
// Create a small config map.
if _, err := client.CoreV1().ConfigMaps(cfg.Namespace).Create(context.TODO(), cfg, metav1.CreateOptions{}); err != nil {
t.Errorf("unable to create configMap: %v", err)
}
patchString := `{"data":{"k":"v"`
for i := 0; i < 9999; i++ {
unique := fmt.Sprintf("this-key-is-very-long-so-as-to-create-a-very-large-serialized-fieldset-%v", i)
patchString = fmt.Sprintf("%s,%q:%q", patchString, unique, "A")
}
patchString = fmt.Sprintf("%s}}", patchString)
// Should be able to update a small object to be near the object size limit.
_, err := client.CoreV1().RESTClient().Patch(types.MergePatchType).
AbsPath("/api/v1").
Namespace(cfg.Namespace).
Resource("configmaps").
Name(cfg.Name).
Body([]byte(patchString)).Do(context.TODO()).Get()
if err != nil {
t.Errorf("unable to patch configMap: %v", err)
}
// Applying to the same object should cause managedFields to go over the object size limit, and fail.
_, err = client.CoreV1().RESTClient().Patch(types.ApplyYAMLPatchType).
Namespace("default").
Resource("configmaps").
Name("large-patch-test-cm").
Param("fieldManager", "apply_test").
Body([]byte(`{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "large-patch-test-cm",
"namespace": "default",
}
}`)).
Do(context.TODO()).
Get()
if err == nil {
t.Fatalf("expected to fail to update object using Apply patch, but succeeded")
}
_, err = client.CoreV1().RESTClient().Patch(types.ApplyCBORPatchType).
Namespace("default").
Resource("configmaps").
Name("large-patch-test-cm").
Param("fieldManager", "apply_test").
Body([]byte("\xa3\x4aapiVersion\x42v1\x44kind\x49ConfigMap\x48metadata\xa2\x44name\x53large-patch-test-cm\x49namespace\x47default")).
Do(context.TODO()).
Get()
if err == nil {
t.Fatalf("expected to fail to update object using Apply patch (cbor), but succeeded")
}
}
// TestApplyManagedFields makes sure that managedFields api does not change // TestApplyManagedFields makes sure that managedFields api does not change
func TestApplyManagedFields(t *testing.T) { func TestApplyManagedFields(t *testing.T) {
client, closeFn := setup(t) client, closeFn := setup(t)

View File

@ -18,7 +18,6 @@ package apiserver
import ( import (
"context" "context"
"encoding/json"
"strings" "strings"
"testing" "testing"
@ -28,8 +27,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
@ -92,6 +93,19 @@ func createMapping(groupVersion string, resource metav1.APIResource) (*meta.REST
// TestApplyStatus makes sure that applying the status works for all known types. // TestApplyStatus makes sure that applying the status works for all known types.
func TestApplyStatus(t *testing.T) { func TestApplyStatus(t *testing.T) {
testApplyStatus(t, func(testing.TB, *rest.Config) {})
}
// TestApplyStatus makes sure that applying the status works for all known types.
func TestApplyStatusWithCBOR(t *testing.T) {
framework.EnableCBORServingAndStorageForTest(t)
framework.SetTestOnlyCBORClientFeatureGatesForTest(t, true, true)
testApplyStatus(t, func(t testing.TB, config *rest.Config) {
config.Wrap(framework.AssertRequestResponseAsCBOR(t))
})
}
func testApplyStatus(t *testing.T, reconfigureClient func(testing.TB, *rest.Config)) {
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), []string{"--disable-admission-plugins", "ServiceAccount,TaintNodesByCondition"}, framework.SharedEtcd()) server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), []string{"--disable-admission-plugins", "ServiceAccount,TaintNodesByCondition"}, framework.SharedEtcd())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -102,10 +116,6 @@ func TestApplyStatus(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
dynamicClient, err := dynamic.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
// create CRDs so we can make sure that custom resources do not get lost // create CRDs so we can make sure that custom resources do not get lost
etcd.CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(server.ClientConfig), false, etcd.GetCustomResourceDefinitionData()...) etcd.CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(server.ClientConfig), false, etcd.GetCustomResourceDefinitionData()...)
@ -134,7 +144,7 @@ func TestApplyStatus(t *testing.T) {
// both spec and status get wiped for CSRs, // both spec and status get wiped for CSRs,
// nothing is expected to be managed for it, skip it // nothing is expected to be managed for it, skip it
if mapping.Resource.Resource == "certificatesigningrequests" { if mapping.Resource.Resource == "certificatesigningrequests" {
t.Skip() t.SkipNow()
} }
status, ok := statusData[mapping.Resource] status, ok := statusData[mapping.Resource]
@ -159,6 +169,13 @@ func TestApplyStatus(t *testing.T) {
// etcd test stub data doesn't contain apiVersion/kind (!), but apply requires it // etcd test stub data doesn't contain apiVersion/kind (!), but apply requires it
newObj.SetGroupVersionKind(mapping.GroupVersionKind) newObj.SetGroupVersionKind(mapping.GroupVersionKind)
dynamicClientConfig := rest.CopyConfig(server.ClientConfig)
reconfigureClient(t, dynamicClientConfig)
dynamicClient, err := dynamic.NewForConfig(dynamicClientConfig)
if err != nil {
t.Fatal(err)
}
rsc := dynamicClient.Resource(mapping.Resource).Namespace(namespace) rsc := dynamicClient.Resource(mapping.Resource).Namespace(namespace)
// apply to create // apply to create
_, err = rsc.Apply(context.TODO(), name, &newObj, metav1.ApplyOptions{FieldManager: "create_test"}) _, err = rsc.Apply(context.TODO(), name, &newObj, metav1.ApplyOptions{FieldManager: "create_test"})

View File

@ -47,11 +47,12 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1"
autoscalingv1ac "k8s.io/client-go/applyconfigurations/autoscaling/v1"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1" corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" metav1ac "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/gentype" "k8s.io/client-go/gentype"
clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
clientscheme "k8s.io/client-go/kubernetes/scheme" clientscheme "k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
@ -61,7 +62,6 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/kubernetes/test/utils/ktesting"
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1" wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
wardlev1alpha1client "k8s.io/sample-apiserver/pkg/generated/clientset/versioned/typed/wardle/v1alpha1" wardlev1alpha1client "k8s.io/sample-apiserver/pkg/generated/clientset/versioned/typed/wardle/v1alpha1"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
@ -71,7 +71,7 @@ func TestClient(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
client := clientset.NewForConfigOrDie(result.ClientConfig) client := kubernetes.NewForConfigOrDie(result.ClientConfig)
info, err := client.Discovery().ServerVersion() info, err := client.Discovery().ServerVersion()
if err != nil { if err != nil {
@ -145,7 +145,7 @@ func TestAtomicPut(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
c := clientset.NewForConfigOrDie(result.ClientConfig) c := kubernetes.NewForConfigOrDie(result.ClientConfig)
rcBody := v1.ReplicationController{ rcBody := v1.ReplicationController{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
@ -234,7 +234,7 @@ func TestPatch(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
c := clientset.NewForConfigOrDie(result.ClientConfig) c := kubernetes.NewForConfigOrDie(result.ClientConfig)
name := "patchpod" name := "patchpod"
resource := "pods" resource := "pods"
@ -353,7 +353,7 @@ func TestPatchWithCreateOnUpdate(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
c := clientset.NewForConfigOrDie(result.ClientConfig) c := kubernetes.NewForConfigOrDie(result.ClientConfig)
endpointTemplate := &v1.Endpoints{ endpointTemplate := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -461,7 +461,7 @@ func TestAPIVersions(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
c := clientset.NewForConfigOrDie(result.ClientConfig) c := kubernetes.NewForConfigOrDie(result.ClientConfig)
clientVersion := c.CoreV1().RESTClient().APIVersion().String() clientVersion := c.CoreV1().RESTClient().APIVersion().String()
g, err := c.Discovery().ServerGroups() g, err := c.Discovery().ServerGroups()
@ -483,7 +483,7 @@ func TestEventValidation(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
client := clientset.NewForConfigOrDie(result.ClientConfig) client := kubernetes.NewForConfigOrDie(result.ClientConfig)
createNamespace := func(namespace string) string { createNamespace := func(namespace string) string {
if namespace == "" { if namespace == "" {
@ -591,7 +591,7 @@ func TestEventCompatibility(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
client := clientset.NewForConfigOrDie(result.ClientConfig) client := kubernetes.NewForConfigOrDie(result.ClientConfig)
coreevents := []*v1.Event{ coreevents := []*v1.Event{
{ {
@ -701,7 +701,7 @@ func TestSingleWatch(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
client := clientset.NewForConfigOrDie(result.ClientConfig) client := kubernetes.NewForConfigOrDie(result.ClientConfig)
mkEvent := func(i int) *v1.Event { mkEvent := func(i int) *v1.Event {
name := fmt.Sprintf("event-%v", i) name := fmt.Sprintf("event-%v", i)
@ -785,7 +785,7 @@ func TestMultiWatch(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer result.TearDownFn() defer result.TearDownFn()
client := clientset.NewForConfigOrDie(result.ClientConfig) client := kubernetes.NewForConfigOrDie(result.ClientConfig)
dummyEvent := func(i int) *v1.Event { dummyEvent := func(i int) *v1.Event {
name := fmt.Sprintf("unrelated-%v", i) name := fmt.Sprintf("unrelated-%v", i)
@ -1014,7 +1014,7 @@ func TestApplyWithApplyConfiguration(t *testing.T) {
testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer testServer.TearDownFn() defer testServer.TearDownFn()
c := clientset.NewForConfigOrDie(testServer.ClientConfig) c := kubernetes.NewForConfigOrDie(testServer.ClientConfig)
// Test apply to spec // Test apply to spec
obj, err := c.AppsV1().Deployments("default").Apply(context.TODO(), deployment, metav1.ApplyOptions{FieldManager: "test-mgr", Force: true}) obj, err := c.AppsV1().Deployments("default").Apply(context.TODO(), deployment, metav1.ApplyOptions{FieldManager: "test-mgr", Force: true})
@ -1172,7 +1172,7 @@ func TestExtractModifyApply(t *testing.T) {
testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer testServer.TearDownFn() defer testServer.TearDownFn()
c := clientset.NewForConfigOrDie(testServer.ClientConfig) c := kubernetes.NewForConfigOrDie(testServer.ClientConfig)
deploymentClient := c.AppsV1().Deployments("default") deploymentClient := c.AppsV1().Deployments("default")
fieldMgr := "test-mgr" fieldMgr := "test-mgr"
@ -1244,7 +1244,7 @@ func TestExtractModifyApply(t *testing.T) {
func TestExtractModifyApply_ForceOwnership(t *testing.T) { func TestExtractModifyApply_ForceOwnership(t *testing.T) {
testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer testServer.TearDownFn() defer testServer.TearDownFn()
c := clientset.NewForConfigOrDie(testServer.ClientConfig) c := kubernetes.NewForConfigOrDie(testServer.ClientConfig)
deploymentClient := c.AppsV1().Deployments("default") deploymentClient := c.AppsV1().Deployments("default")
// apply an initial state with one field manager // apply an initial state with one field manager
@ -1364,7 +1364,7 @@ func TestClientCBOREnablement(t *testing.T) {
// Generated clients for built-in types force Protobuf by default. They are tested here to // Generated clients for built-in types force Protobuf by default. They are tested here to
// ensure that the CBOR client feature gates do not interfere with this. // ensure that the CBOR client feature gates do not interfere with this.
DoRequestWithProtobufPreferredGeneratedClient := func(t *testing.T, config *rest.Config) error { DoRequestWithProtobufPreferredGeneratedClient := func(t *testing.T, config *rest.Config) error {
clientset, err := clientset.NewForConfig(config) clientset, err := kubernetes.NewForConfig(config)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1401,7 +1401,7 @@ func TestClientCBOREnablement(t *testing.T) {
} }
DoRequestWithGenericTypedClient := func(t *testing.T, config *rest.Config) error { DoRequestWithGenericTypedClient := func(t *testing.T, config *rest.Config) error {
clientset, err := clientset.NewForConfig(config) clientset, err := kubernetes.NewForConfig(config)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1794,8 +1794,6 @@ func TestClientCBOREnablement(t *testing.T) {
} }
func TestCBORWithTypedClient(t *testing.T) { func TestCBORWithTypedClient(t *testing.T) {
ktesting.SetDefaultVerbosity(10) // todo
framework.EnableCBORServingAndStorageForTest(t) framework.EnableCBORServingAndStorageForTest(t)
framework.SetTestOnlyCBORClientFeatureGatesForTest(t, true, true) framework.SetTestOnlyCBORClientFeatureGatesForTest(t, true, true)
@ -1806,7 +1804,7 @@ func TestCBORWithTypedClient(t *testing.T) {
{ {
// Setup using client with default config. // Setup using client with default config.
clientset, err := clientset.NewForConfig(server.ClientConfig) clientset, err := kubernetes.NewForConfig(server.ClientConfig)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1825,7 +1823,7 @@ func TestCBORWithTypedClient(t *testing.T) {
config.ContentType = "" config.ContentType = ""
config.AcceptContentTypes = "" config.AcceptContentTypes = ""
config.Wrap(framework.AssertRequestResponseAsCBOR(t)) config.Wrap(framework.AssertRequestResponseAsCBOR(t))
clientset, err := clientset.NewForConfig(config) clientset, err := kubernetes.NewForConfig(config)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1933,4 +1931,56 @@ func TestCBORWithTypedClient(t *testing.T) {
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
config = rest.CopyConfig(server.ClientConfig)
// Configuring a non-empty AcceptContentTypes avoids the "default to accepting Protobuf"
// behavior from client-gen's --prefer-protobuf option, which is set when generating all of
// the clients with ApplyScale.
config.AcceptContentTypes = "application/cbor"
config.Wrap(framework.AssertRequestResponseAsCBOR(t))
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
// for Apply, ApplyStatus, and ApplyScale
rsClient := clientset.AppsV1().ReplicaSets(TestNamespace)
rs, err := rsClient.Apply(
context.TODO(),
appsv1ac.ReplicaSet("test-cbor-typed-client", TestNamespace).
WithSpec(appsv1ac.ReplicaSetSpec().
WithReplicas(0).
WithSelector(metav1ac.LabelSelector().WithMatchLabels(map[string]string{"foo": "bar"})).
WithTemplate(corev1ac.PodTemplateSpec().
WithLabels(map[string]string{"foo": "bar"}).
WithSpec(corev1ac.PodSpec().
WithContainers(corev1ac.Container().
WithName("testing").
WithImage("busybox"),
),
),
),
),
metav1.ApplyOptions{FieldManager: "test-cbor-typed-client"},
)
if err != nil {
t.Fatal(err)
}
if _, err := rsClient.ApplyScale(
context.TODO(),
rs.GetName(),
autoscalingv1ac.Scale().WithSpec(autoscalingv1ac.ScaleSpec().WithReplicas(1)),
metav1.ApplyOptions{
FieldManager: "test-cbor-typed-client",
DryRun: []string{metav1.DryRunAll},
Force: true,
},
); err != nil {
t.Fatal(err)
}
if _, err := rsClient.ApplyStatus(context.TODO(), appsv1ac.ReplicaSet(rs.GetName(), rs.GetNamespace()), metav1.ApplyOptions{FieldManager: "test-cbor-typed-client", DryRun: []string{metav1.DryRunAll}}); err != nil {
t.Fatal(err)
}
} }

View File

@ -302,7 +302,54 @@ func unstructuredToEvent(obj *unstructured.Unstructured) (*corev1.Event, error)
} }
func TestDynamicClientCBOREnablement(t *testing.T) { func TestDynamicClientCBOREnablement(t *testing.T) {
for _, tc := range []struct { DoCreate := func(t *testing.T, config *rest.Config) error {
client, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
_, err = client.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Create(
context.TODO(),
&unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": "test-dynamic-client-cbor-enablement",
},
},
},
metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}},
)
return err
}
DoApply := func(t *testing.T, config *rest.Config) error {
client, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
name := "test-dynamic-client-cbor-enablement"
_, err = client.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Apply(
context.TODO(),
name,
&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Namespace",
"metadata": map[string]interface{}{
"name": name,
},
},
},
metav1.ApplyOptions{
FieldManager: "foo-bar",
DryRun: []string{metav1.DryRunAll},
},
)
return err
}
testCases := []struct {
name string name string
serving bool serving bool
allowed bool allowed bool
@ -312,6 +359,7 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantResponseContentType string wantResponseContentType string
wantResponseStatus int wantResponseStatus int
wantStatusError bool wantStatusError bool
doRequest func(t *testing.T, config *rest.Config) error
}{ }{
{ {
name: "sends cbor accepts both gets cbor", name: "sends cbor accepts both gets cbor",
@ -323,6 +371,7 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantResponseContentType: "application/cbor", wantResponseContentType: "application/cbor",
wantResponseStatus: http.StatusCreated, wantResponseStatus: http.StatusCreated,
wantStatusError: false, wantStatusError: false,
doRequest: DoCreate,
}, },
{ {
name: "sends cbor accepts both gets 415", name: "sends cbor accepts both gets 415",
@ -334,6 +383,7 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantResponseContentType: "application/json", wantResponseContentType: "application/json",
wantResponseStatus: http.StatusUnsupportedMediaType, wantResponseStatus: http.StatusUnsupportedMediaType,
wantStatusError: true, wantStatusError: true,
doRequest: DoCreate,
}, },
{ {
name: "sends json accepts both gets cbor", name: "sends json accepts both gets cbor",
@ -345,6 +395,7 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantResponseContentType: "application/cbor", wantResponseContentType: "application/cbor",
wantResponseStatus: http.StatusCreated, wantResponseStatus: http.StatusCreated,
wantStatusError: false, wantStatusError: false,
doRequest: DoCreate,
}, },
{ {
name: "sends json accepts both gets json", name: "sends json accepts both gets json",
@ -356,6 +407,7 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantResponseContentType: "application/json", wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated, wantResponseStatus: http.StatusCreated,
wantStatusError: false, wantStatusError: false,
doRequest: DoCreate,
}, },
{ {
name: "sends json accepts json gets json with serving enabled", name: "sends json accepts json gets json with serving enabled",
@ -367,6 +419,7 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantResponseContentType: "application/json", wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated, wantResponseStatus: http.StatusCreated,
wantStatusError: false, wantStatusError: false,
doRequest: DoCreate,
}, },
{ {
name: "sends json accepts json gets json with serving disabled", name: "sends json accepts json gets json with serving disabled",
@ -378,6 +431,7 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantResponseContentType: "application/json", wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated, wantResponseStatus: http.StatusCreated,
wantStatusError: false, wantStatusError: false,
doRequest: DoCreate,
}, },
{ {
name: "sends json without both gates enabled", name: "sends json without both gates enabled",
@ -389,60 +443,92 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantResponseContentType: "application/json", wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated, wantResponseStatus: http.StatusCreated,
wantStatusError: false, wantStatusError: false,
doRequest: DoCreate,
}, },
} { {
t.Run(tc.name, func(t *testing.T) { name: "apply sends cbor accepts both gets cbor",
if tc.serving { serving: true,
allowed: true,
preferred: true,
wantRequestContentType: "application/apply-patch+cbor",
wantRequestAccept: "application/json;q=0.9,application/cbor;q=1",
wantResponseContentType: "application/cbor",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoApply,
},
{
name: "apply sends json accepts both gets cbor",
serving: true,
allowed: true,
preferred: false,
wantRequestContentType: "application/apply-patch+yaml",
wantRequestAccept: "application/json;q=0.9,application/cbor;q=1",
wantResponseContentType: "application/cbor",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoApply,
},
{
name: "apply sends cbor accepts both gets 415",
serving: false,
allowed: true,
preferred: true,
wantRequestContentType: "application/apply-patch+cbor",
wantRequestAccept: "application/json;q=0.9,application/cbor;q=1",
wantResponseContentType: "application/json",
wantResponseStatus: http.StatusUnsupportedMediaType,
wantStatusError: true,
doRequest: DoApply,
},
}
for _, serving := range []bool{true, false} {
t.Run(fmt.Sprintf("serving=%t", serving), func(t *testing.T) {
if serving {
framework.EnableCBORServingAndStorageForTest(t) framework.EnableCBORServingAndStorageForTest(t)
} }
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer server.TearDownFn() defer server.TearDownFn()
framework.SetTestOnlyCBORClientFeatureGatesForTest(t, tc.allowed, tc.preferred) for _, tc := range testCases {
if serving != tc.serving {
continue
}
config := rest.CopyConfig(server.ClientConfig) t.Run(tc.name, func(t *testing.T) {
config.Wrap(func(rt http.RoundTripper) http.RoundTripper { framework.SetTestOnlyCBORClientFeatureGatesForTest(t, tc.allowed, tc.preferred)
return roundTripperFunc(func(request *http.Request) (*http.Response, error) {
response, err := rt.RoundTrip(request) config := rest.CopyConfig(server.ClientConfig)
if got := response.Request.Header.Get("Content-Type"); got != tc.wantRequestContentType { config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
t.Errorf("want request content type %q, got %q", tc.wantRequestContentType, got) return roundTripperFunc(func(request *http.Request) (*http.Response, error) {
response, err := rt.RoundTrip(request)
if got := response.Request.Header.Get("Content-Type"); got != tc.wantRequestContentType {
t.Errorf("want request content type %q, got %q", tc.wantRequestContentType, got)
}
if got := response.Request.Header.Get("Accept"); got != tc.wantRequestAccept {
t.Errorf("want request accept %q, got %q", tc.wantRequestAccept, got)
}
if got := response.Header.Get("Content-Type"); got != tc.wantResponseContentType {
t.Errorf("want response content type %q, got %q", tc.wantResponseContentType, got)
}
if got := response.StatusCode; got != tc.wantResponseStatus {
t.Errorf("want response status %d, got %d", tc.wantResponseStatus, got)
}
return response, err
})
})
err := tc.doRequest(t, config)
switch {
case tc.wantStatusError && errors.IsUnsupportedMediaType(err):
// ok
case !tc.wantStatusError && err == nil:
// ok
default:
t.Errorf("unexpected error: %v", err)
} }
if got := response.Request.Header.Get("Accept"); got != tc.wantRequestAccept {
t.Errorf("want request accept %q, got %q", tc.wantRequestAccept, got)
}
if got := response.Header.Get("Content-Type"); got != tc.wantResponseContentType {
t.Errorf("want response content type %q, got %q", tc.wantResponseContentType, got)
}
if got := response.StatusCode; got != tc.wantResponseStatus {
t.Errorf("want response status %d, got %d", tc.wantResponseStatus, got)
}
return response, err
}) })
})
client, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
_, err = client.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Create(
context.TODO(),
&unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": "test-dynamic-client-cbor-enablement",
},
},
},
metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}},
)
switch {
case tc.wantStatusError && errors.IsUnsupportedMediaType(err):
// ok
case !tc.wantStatusError && err == nil:
// ok
default:
t.Errorf("unexpected error: %v", err)
} }
}) })
} }

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/cbor" "k8s.io/apimachinery/pkg/runtime/serializer/cbor"
"k8s.io/apimachinery/pkg/runtime/serializer/cbor/direct"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -111,8 +112,6 @@ func EnableCBORServingAndStorageForTest(tb testing.TB) {
// AssertRequestResponseAsCBOR returns a transport.WrapperFunc that will report a test error if a // AssertRequestResponseAsCBOR returns a transport.WrapperFunc that will report a test error if a
// non-empty request or response body contains data that does not appear to be CBOR-encoded. // non-empty request or response body contains data that does not appear to be CBOR-encoded.
func AssertRequestResponseAsCBOR(t testing.TB) transport.WrapperFunc { func AssertRequestResponseAsCBOR(t testing.TB) transport.WrapperFunc {
recognizer := cbor.NewSerializer(runtime.NewScheme(), runtime.NewScheme())
unsupportedPatchContentTypes := sets.New( unsupportedPatchContentTypes := sets.New(
"application/json-patch+json", "application/json-patch+json",
"application/merge-patch+json", "application/merge-patch+json",
@ -126,12 +125,11 @@ func AssertRequestResponseAsCBOR(t testing.TB) transport.WrapperFunc {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
recognized, _, err := recognizer.RecognizesData(requestbody) if len(requestbody) > 0 {
if err != nil { err = direct.Unmarshal(requestbody, new(interface{}))
t.Error(err) if err != nil {
} t.Errorf("non-cbor request: 0x%x", requestbody)
if len(requestbody) > 0 && !recognized { }
t.Errorf("non-cbor request: 0x%x", requestbody)
} }
request.Body = io.NopCloser(bytes.NewReader(requestbody)) request.Body = io.NopCloser(bytes.NewReader(requestbody))
} }
@ -152,11 +150,10 @@ func AssertRequestResponseAsCBOR(t testing.TB) transport.WrapperFunc {
Closer: response.Body, Closer: response.Body,
} }
t.Cleanup(func() { t.Cleanup(func() {
recognized, _, err := recognizer.RecognizesData(buf.Bytes()) if buf.Len() == 0 {
if err != nil { return
t.Error(err)
} }
if buf.Len() > 0 && !recognized { if err := direct.Unmarshal(buf.Bytes(), new(interface{})); err != nil {
t.Errorf("non-cbor response: 0x%x", buf.Bytes()) t.Errorf("non-cbor response: 0x%x", buf.Bytes())
} }
}) })