diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/BUILD index 47a27a3340c..3d54973b890 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/BUILD @@ -103,6 +103,8 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/util/wsstream:go_default_library", "//vendor/github.com/evanphx/json-patch:go_default_library", "//vendor/golang.org/x/net/websocket:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/trace:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go index dd9b5e9d97c..f97cbaa2c85 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go @@ -27,6 +27,7 @@ import ( "unicode/utf8" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/validation" @@ -139,6 +140,16 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int } trace.Step("About to store object in database") + admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo) + requestFunc := func() (runtime.Object, error) { + return r.Create( + ctx, + name, + obj, + rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope), + options, + ) + } result, err := finishRequest(timeout, func() (runtime.Object, error) { if scope.FieldManager != nil { liveObj, err := scope.Creater.New(scope.Kind) @@ -150,20 +161,21 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int return nil, fmt.Errorf("failed to update object (Create for %v) managed fields: %v", scope.Kind, err) } } - - admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo) if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) { if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil { return nil, err } } - return r.Create( - ctx, - name, - obj, - rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope), - options, - ) + result, err := requestFunc() + // If the object wasn't committed to storage because it's serialized size was too large, + // it is safe to remove managedFields (which can be large) and try again. + if isTooLargeError(err) { + if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil { + accessor.SetManagedFields(nil) + result, err = requestFunc() + } + } + return result, err }) if err != nil { scope.err(err, w, req) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go index 41c69b35fff..e9d11001758 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go @@ -581,12 +581,28 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti wasCreated := false p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission) - result, err := finishRequest(p.timeout, func() (runtime.Object, error) { + requestFunc := func() (runtime.Object, error) { // Pass in UpdateOptions to override UpdateStrategy.AllowUpdateOnCreate options := patchToUpdateOptions(p.options) updateObject, created, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation, p.forceAllowCreate, options) wasCreated = created return updateObject, updateErr + } + result, err := finishRequest(p.timeout, func() (runtime.Object, error) { + result, err := requestFunc() + // If the object wasn't committed to storage because it's serialized size was too large, + // it is safe to remove managedFields (which can be large) and try again. + if isTooLargeError(err) && p.patchType != types.ApplyPatchType { + if _, accessorErr := meta.Accessor(p.restPatcher.New()); accessorErr == nil { + p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission, func(_ context.Context, obj, _ runtime.Object) (runtime.Object, error) { + accessor, _ := meta.Accessor(obj) + accessor.SetManagedFields(nil) + return obj, nil + }) + result, err = requestFunc() + } + } + return result, err }) return result, wasCreated, err } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go index 11d13cd3120..97471637e56 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go @@ -25,8 +25,12 @@ import ( "net/http" "net/url" goruntime "runtime" + "strings" "time" + grpccodes "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -416,3 +420,28 @@ func parseTimeout(str string) time.Duration { func isDryRun(url *url.URL) bool { return len(url.Query()["dryRun"]) != 0 } + +type etcdError interface { + Code() grpccodes.Code + Error() string +} + +type grpcError interface { + GRPCStatus() *grpcstatus.Status +} + +func isTooLargeError(err error) bool { + if err != nil { + if etcdErr, ok := err.(etcdError); ok { + if etcdErr.Code() == grpccodes.InvalidArgument && etcdErr.Error() == "etcdserver: request is too large" { + return true + } + } + if grpcErr, ok := err.(grpcError); ok { + if grpcErr.GRPCStatus().Code() == grpccodes.ResourceExhausted && strings.Contains(grpcErr.GRPCStatus().Message(), "trying to send message larger than max") { + return true + } + } + } + return false +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go index 1e75fdb06bd..c58fe9d5792 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/validation" @@ -124,15 +125,22 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa userInfo, _ := request.UserFrom(ctx) transformers := []rest.TransformFunc{} + + // allows skipping managedFields update if the resulting object is too big + shouldUpdateManagedFields := true if scope.FieldManager != nil { transformers = append(transformers, func(_ context.Context, newObj, liveObj runtime.Object) (runtime.Object, error) { - obj, err := scope.FieldManager.Update(liveObj, newObj, managerOrUserAgent(options.FieldManager, req.UserAgent())) - if err != nil { - return nil, fmt.Errorf("failed to update object (Update for %v) managed fields: %v", scope.Kind, err) + if shouldUpdateManagedFields { + obj, err := scope.FieldManager.Update(liveObj, newObj, managerOrUserAgent(options.FieldManager, req.UserAgent())) + if err != nil { + return nil, fmt.Errorf("failed to update object (Update for %v) managed fields: %v", scope.Kind, err) + } + return obj, nil } - return obj, nil + return newObj, nil }) } + if mutatingAdmission, ok := admit.(admission.MutationInterface); ok { transformers = append(transformers, func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) { isNotZeroObject, err := hasUID(oldObj) @@ -149,7 +157,6 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa } return newObj, nil }) - } createAuthorizerAttributes := authorizer.AttributesRecord{ @@ -167,7 +174,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa trace.Step("About to store object in database") wasCreated := false - result, err := finishRequest(timeout, func() (runtime.Object, error) { + requestFunc := func() (runtime.Object, error) { obj, created, err := r.Update( ctx, name, @@ -184,6 +191,19 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa ) wasCreated = created return obj, err + } + result, err := finishRequest(timeout, func() (runtime.Object, error) { + result, err := requestFunc() + // If the object wasn't committed to storage because it's serialized size was too large, + // it is safe to remove managedFields (which can be large) and try again. + if isTooLargeError(err) && scope.FieldManager != nil { + if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil { + accessor.SetManagedFields(nil) + shouldUpdateManagedFields = false + result, err = requestFunc() + } + } + return result, err }) if err != nil { scope.err(err, w, req) diff --git a/test/integration/apiserver/apply/BUILD b/test/integration/apiserver/apply/BUILD index e2f835972a0..c9f8dc267fe 100644 --- a/test/integration/apiserver/apply/BUILD +++ b/test/integration/apiserver/apply/BUILD @@ -25,6 +25,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", diff --git a/test/integration/apiserver/apply/apply_test.go b/test/integration/apiserver/apply/apply_test.go index 921ef1f7644..496daedb257 100644 --- a/test/integration/apiserver/apply/apply_test.go +++ b/test/integration/apiserver/apply/apply_test.go @@ -36,6 +36,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" genericfeatures "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" @@ -440,6 +441,177 @@ func TestApplyGroupsManySeparateUpdates(t *testing.T) { } } +// TestCreateVeryLargeObject tests that a very large object can be created without exceeding the size limit due to managedFields +func TestCreateVeryLargeObject(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)() + + _, client, closeFn := setup(t) + defer closeFn() + + cfg := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "large-create-test-cm", + Namespace: "default", + }, + Data: map[string]string{}, + } + + for i := 0; i < 9999; i++ { + unique := fmt.Sprintf("this-key-is-very-long-so-as-to-create-a-very-large-serialized-fieldset-%v", i) + cfg.Data[unique] = "A" + } + + // Should be able to create an object near the object size limit. + if _, err := client.CoreV1().ConfigMaps(cfg.Namespace).Create(context.TODO(), cfg, metav1.CreateOptions{}); err != nil { + t.Errorf("unable to create large test configMap: %v", err) + } + + // Applying to the same object should cause managedFields to go over the object size limit, and fail. + _, err := client.CoreV1().RESTClient().Patch(types.ApplyPatchType). + Namespace(cfg.Namespace). + Resource("configmaps"). + Name(cfg.Name). + Param("fieldManager", "apply_test"). + Body([]byte(`{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "large-create-test-cm", + "namespace": "default", + } + }`)). + Do(context.TODO()). + Get() + if err == nil { + t.Fatalf("expected to fail to update object using Apply patch, but succeeded") + } +} + +// TestUpdateVeryLargeObject tests that a small object can be updated to be very large without exceeding the size limit due to managedFields +func TestUpdateVeryLargeObject(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)() + + _, client, closeFn := setup(t) + defer closeFn() + + cfg := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "large-update-test-cm", + Namespace: "default", + }, + Data: map[string]string{"k": "v"}, + } + + // Create a small config map. + cfg, err := client.CoreV1().ConfigMaps(cfg.Namespace).Create(context.TODO(), cfg, metav1.CreateOptions{}) + if err != nil { + t.Errorf("unable to create configMap: %v", err) + } + + // Should be able to update a small object to be near the object size limit. + var updateErr error + pollErr := wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (bool, error) { + updateCfg, err := client.CoreV1().ConfigMaps(cfg.Namespace).Get(context.TODO(), cfg.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + // Apply the large update, then attempt to push it to the apiserver. + for i := 0; i < 9999; i++ { + unique := fmt.Sprintf("this-key-is-very-long-so-as-to-create-a-very-large-serialized-fieldset-%v", i) + updateCfg.Data[unique] = "A" + } + + if _, err = client.CoreV1().ConfigMaps(cfg.Namespace).Update(context.TODO(), updateCfg, metav1.UpdateOptions{}); err == nil { + return true, nil + } + updateErr = err + return false, nil + }) + if pollErr == wait.ErrWaitTimeout { + t.Errorf("unable to update configMap: %v", updateErr) + } + + // Applying to the same object should cause managedFields to go over the object size limit, and fail. + _, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType). + Namespace(cfg.Namespace). + Resource("configmaps"). + Name(cfg.Name). + Param("fieldManager", "apply_test"). + Body([]byte(`{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "large-update-test-cm", + "namespace": "default", + } + }`)). + Do(context.TODO()). + Get() + if err == nil { + t.Fatalf("expected to fail to update object using Apply patch, but succeeded") + } +} + +// TestPatchVeryLargeObject tests that a small object can be patched to be very large without exceeding the size limit due to managedFields +func TestPatchVeryLargeObject(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)() + + _, client, closeFn := setup(t) + defer closeFn() + + cfg := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "large-patch-test-cm", + Namespace: "default", + }, + Data: map[string]string{"k": "v"}, + } + + // Create a small config map. + if _, err := client.CoreV1().ConfigMaps(cfg.Namespace).Create(context.TODO(), cfg, metav1.CreateOptions{}); err != nil { + t.Errorf("unable to create configMap: %v", err) + } + + patchString := `{"data":{"k":"v"` + for i := 0; i < 9999; i++ { + unique := fmt.Sprintf("this-key-is-very-long-so-as-to-create-a-very-large-serialized-fieldset-%v", i) + patchString = fmt.Sprintf("%s,%q:%q", patchString, unique, "A") + } + patchString = fmt.Sprintf("%s}}", patchString) + + // Should be able to update a small object to be near the object size limit. + _, err := client.CoreV1().RESTClient().Patch(types.MergePatchType). + AbsPath("/api/v1"). + Namespace(cfg.Namespace). + Resource("configmaps"). + Name(cfg.Name). + Body([]byte(patchString)).Do(context.TODO()).Get() + if err != nil { + t.Errorf("unable to patch configMap: %v", err) + } + + // Applying to the same object should cause managedFields to go over the object size limit, and fail. + _, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType). + Namespace("default"). + Resource("configmaps"). + Name("large-patch-test-cm"). + Param("fieldManager", "apply_test"). + Body([]byte(`{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "large-patch-test-cm", + "namespace": "default", + } + }`)). + Do(context.TODO()). + Get() + if err == nil { + t.Fatalf("expected to fail to update object using Apply patch, but succeeded") + } +} + // TestApplyManagedFields makes sure that managedFields api does not change func TestApplyManagedFields(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()