diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 55f97ba6d80..1f0f3a8a4c6 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -211,6 +211,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, }, + genericfeatures.AllowUnsafeMalformedObjectDeletion: { + {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha}, + }, + genericfeatures.AnonymousAuthConfigurableEndpoints: { {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta}, diff --git a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go index 57e0e71f672..6a3ab8f24e2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go @@ -54,6 +54,7 @@ var knownReasons = map[metav1.StatusReason]struct{}{ metav1.StatusReasonGone: {}, metav1.StatusReasonInvalid: {}, metav1.StatusReasonServerTimeout: {}, + metav1.StatusReasonStoreReadError: {}, metav1.StatusReasonTimeout: {}, metav1.StatusReasonTooManyRequests: {}, metav1.StatusReasonBadRequest: {}, @@ -775,6 +776,12 @@ func IsUnexpectedObjectError(err error) bool { return err != nil && (ok || errors.As(err, &uoe)) } +// IsStoreReadError determines if err is due to either failure to transform the +// data from the storage, or failure to decode the object appropriately. +func IsStoreReadError(err error) bool { + return ReasonForError(err) == metav1.StatusReasonStoreReadError +} + // SuggestsClientDelay returns true if this error suggests a client delay as well as the // suggested seconds to wait, or false if the error does not imply a wait. It does not // address whether the error *should* be retried, since some errors (like a 3xx) may diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go index f4cc8e38bbe..4cf3f47956c 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go @@ -931,6 +931,22 @@ const ( // Status code 500 StatusReasonServerTimeout StatusReason = "ServerTimeout" + // StatusReasonStoreReadError means that the server encountered an error while + // retrieving resources from the backend object store. + // This may be due to backend database error, or because processing of the read + // resource failed. + // Details: + // "kind" string - the kind attribute of the resource being acted on. + // "name" string - the prefix where the reading error(s) occurred + // "causes" []StatusCause + // - (optional): + // - "type" CauseType - CauseTypeUnexpectedServerResponse + // - "message" string - the error message from the store backend + // - "field" string - the full path with the key of the resource that failed reading + // + // Status code 500 + StatusReasonStoreReadError StatusReason = "StorageReadError" + // StatusReasonTimeout means that the request could not be completed within the given time. // Clients can get this response only when they specified a timeout param in the request, // or if the server cannot complete the operation within a reasonable amount of time. diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go index 390eaeef4d2..cb70f8cf0a0 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go @@ -118,6 +118,9 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc } } } + if !utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) && options != nil { + options.IgnoreStoreReadErrorWithClusterBreakingPotential = nil + } if errs := validation.ValidateDeleteOptions(options); len(errs) > 0 { err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", errs) scope.err(err, w, req) @@ -125,6 +128,22 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc } options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("DeleteOptions")) + if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) { + if options != nil && ptr.Deref(options.IgnoreStoreReadErrorWithClusterBreakingPotential, false) { + // let's make sure that the audit will reflect that this delete request + // was tried with ignoreStoreReadErrorWithClusterBreakingPotential enabled + audit.AddAuditAnnotation(ctx, "apiserver.k8s.io/unsafe-delete-ignore-read-error", "") + + p, ok := r.(rest.CorruptObjectDeleterProvider) + if !ok || p.GetCorruptObjDeleter() == nil { + // this is a developer error + scope.err(errors.NewInternalError(fmt.Errorf("no unsafe deleter provided, can not honor ignoreStoreReadErrorWithClusterBreakingPotential")), w, req) + return + } + r = p.GetCorruptObjDeleter() + } + } + span.AddEvent("About to delete object from database") wasDeleted := true userInfo, _ := request.UserFrom(ctx) @@ -262,19 +281,24 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc } } } + if !utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) && options != nil { + options.IgnoreStoreReadErrorWithClusterBreakingPotential = nil + } if errs := validation.ValidateDeleteOptions(options); len(errs) > 0 { err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", errs) scope.err(err, w, req) return } - if options != nil && ptr.Deref(options.IgnoreStoreReadErrorWithClusterBreakingPotential, true) { - fieldErrList := field.ErrorList{ - field.Invalid(field.NewPath("ignoreStoreReadErrorWithClusterBreakingPotential"), true, "is not allowed with DELETECOLLECTION, try again after removing the option"), + if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) { + if options != nil && ptr.Deref(options.IgnoreStoreReadErrorWithClusterBreakingPotential, true) { + fieldErrList := field.ErrorList{ + field.Invalid(field.NewPath("ignoreStoreReadErrorWithClusterBreakingPotential"), true, "is not allowed with DELETECOLLECTION, try again after removing the option"), + } + err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", fieldErrList) + scope.err(err, w, req) + return } - err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", fieldErrList) - scope.err(err, w, req) - return } options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("DeleteOptions")) diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 02debfb7669..d0b064dc2c9 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -54,6 +54,15 @@ const ( // Allows us to enable anonymous auth for only certain apiserver endpoints. AnonymousAuthConfigurableEndpoints featuregate.Feature = "AnonymousAuthConfigurableEndpoints" + // owner: @stlaz @tkashem @dgrisonnet + // kep: https://kep.k8s.io/3926 + // + // Enables the cluster admin to identify resources that fail to + // decrypt or fail to be decoded into an object, and introduces + // a new delete option to allow deletion of such corrupt + // resources using the Kubernetes API only. + AllowUnsafeMalformedObjectDeletion featuregate.Feature = "AllowUnsafeMalformedObjectDeletion" + // owner: @smarterclayton // stable: 1.29 // @@ -264,6 +273,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, }, + AllowUnsafeMalformedObjectDeletion: { + {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha}, + }, + AnonymousAuthConfigurableEndpoints: { {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta}, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/corrupt_obj_deleter.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/corrupt_obj_deleter.go new file mode 100644 index 00000000000..4907da47f58 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/corrupt_obj_deleter.go @@ -0,0 +1,122 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "context" + "errors" + "strings" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/storage" + storeerr "k8s.io/apiserver/pkg/storage/errors" + + "k8s.io/klog/v2" + "k8s.io/utils/ptr" +) + +// the corrupt object deleter has the same interface as rest.GracefulDeleter +var _ rest.GracefulDeleter = &corruptObjectDeleter{} + +// NewCorruptObjectDeleter returns a deleter that can perform unsafe deletion +// of corrupt objects, it makes an attempt to perform a normal deletion flow +// first, and if the normal deletion flow fails with a corrupt object error +// then it performs the unsafe delete of the object. +// +// NOTE: it skips precondition checks, finalizer constraints, and any +// post deletion hook defined in 'AfterDelete' of the registry. +// +// WARNING: This may break the cluster if the resource being deleted has dependencies. +func NewCorruptObjectDeleter(store *Store) rest.GracefulDeleter { + return &corruptObjectDeleter{store: store} +} + +// corruptObjectDeleter implements unsafe object deletion flow +type corruptObjectDeleter struct { + store *Store +} + +// Delete performs an unsafe deletion of the given resource from the storage. +// +// NOTE: This function should NEVER be used for any normal deletion +// flow, it is exclusively used when the user enables +// 'IgnoreStoreReadErrorWithClusterBreakingPotential' in the delete options. +func (d *corruptObjectDeleter) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, opts *metav1.DeleteOptions) (runtime.Object, bool, error) { + if opts == nil || !ptr.Deref[bool](opts.IgnoreStoreReadErrorWithClusterBreakingPotential, false) { + // this is a developer error, we should never be here, since the unsafe + // deleter is wired in the rest layer only when the option is enabled + return nil, false, apierrors.NewInternalError(errors.New("initialization error, expected normal deletion flow to be used")) + } + + key, err := d.store.KeyFunc(ctx, name) + if err != nil { + return nil, false, err + } + obj := d.store.NewFunc() + qualifiedResource := d.store.qualifiedResourceFromContext(ctx) + // use the storage implementation directly, bypass the dryRun layer + storageBackend := d.store.Storage.Storage + // we leave ResourceVersion as empty in the GetOptions so the + // object is retrieved from the underlying storage directly + err = storageBackend.Get(ctx, key, storage.GetOptions{}, obj) + if err == nil || !storage.IsCorruptObject(err) { + // TODO: The Invalid error should have a field for Resource. + // After that field is added, we should fill the Resource and + // leave the Kind field empty. See the discussion in #18526. + qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource} + fieldErrList := field.ErrorList{ + field.Invalid(field.NewPath("ignoreStoreReadErrorWithClusterBreakingPotential"), true, "is exclusively used to delete corrupt object(s), try again by removing this option"), + } + return nil, false, apierrors.NewInvalid(qualifiedKind, name, fieldErrList) + } + + // try normal deletion anyway, it is expected to fail + obj, deleted, err := d.store.Delete(ctx, name, deleteValidation, opts) + if err == nil { + return obj, deleted, err + } + // TODO: unfortunately we can't do storage.IsCorruptObject(err), + // conversion to API error drops the inner error chain + if !strings.Contains(err.Error(), "corrupt object") { + return obj, deleted, err + } + + // TODO: at this instant, some actor may have a) managed to recreate this + // object by doing a delete+create, or b) the underlying error has resolved + // since the last time we checked, and the object is readable now. + klog.FromContext(ctx).V(1).Info("Going to perform unsafe object deletion", "object", klog.KRef(genericapirequest.NamespaceValue(ctx), name)) + out := d.store.NewFunc() + storageOpts := storage.DeleteOptions{IgnoreStoreReadError: true} + // dropping preconditions, and keeping the admission + if err := storageBackend.Delete(ctx, key, out, nil, storage.ValidateObjectFunc(deleteValidation), nil, storageOpts); err != nil { + if storage.IsNotFound(err) { + // the DELETE succeeded, but we don't have the object since it's + // not retrievable from the storage, so we send a nil object + return nil, false, nil + } + return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name) + } + // the DELETE succeeded, but we don't have the object sine it's + // not retrievable from the storage, so we send a nil objct + return nil, true, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/corrupt_obj_deleter_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/corrupt_obj_deleter_test.go new file mode 100644 index 00000000000..1f06e269f3b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/corrupt_obj_deleter_test.go @@ -0,0 +1,288 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "context" + "fmt" + "strings" + "testing" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/apis/example" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/storage" + + "k8s.io/utils/ptr" +) + +type result struct { + deleted bool + err error +} + +type deleteWant struct { + deleted bool + checkErr func(err error) bool +} + +var ( + wantNoError = func(err error) bool { return err == nil } + wantErrContains = func(shouldContain string) func(error) bool { + return func(err error) bool { + return err != nil && strings.Contains(err.Error(), shouldContain) + } + } +) + +func (w deleteWant) verify(t *testing.T, got result) { + t.Helper() + + if !w.checkErr(got.err) { + t.Errorf("Unexpected failure with the deletion operation, got: %v", got.err) + } + if w.deleted != got.deleted { + t.Errorf("Expected deleted to be: %t, but got: %t", w.deleted, got.deleted) + } + +} + +func TestUnsafeDeletePrecondition(t *testing.T) { + option := func(enabled bool) *metav1.DeleteOptions { + return &metav1.DeleteOptions{ + IgnoreStoreReadErrorWithClusterBreakingPotential: ptr.To[bool](enabled), + } + } + + const ( + unsafeDeleteNotAllowed = "ignoreStoreReadErrorWithClusterBreakingPotential: Invalid value: true: is exclusively used to delete corrupt object(s), try again by removing this option" + internalErr = "Internal error occurred: initialization error, expected normal deletion flow to be used" + ) + + tests := []struct { + name string + err error + opts *metav1.DeleteOptions + invoked int + want deleteWant + }{ + { + name: "option nil, should throw internal error", + opts: nil, + want: deleteWant{checkErr: wantErrContains(internalErr)}, + }, + { + name: "option empty, should throw internal error", + opts: &metav1.DeleteOptions{}, + want: deleteWant{checkErr: wantErrContains(internalErr)}, + }, + { + name: "option false, should throw internal error", + opts: option(false), + want: deleteWant{checkErr: wantErrContains(internalErr)}, + }, + { + name: "option true, object readable, should throw invalid error", + opts: option(true), + want: deleteWant{ + checkErr: wantErrContains(unsafeDeleteNotAllowed), + }, + }, + { + name: "option true, object not readable with unexpected error, should throw invalid error", + opts: option(true), + err: fmt.Errorf("unexpected error"), + want: deleteWant{ + checkErr: wantErrContains(unsafeDeleteNotAllowed), + }, + }, + { + name: "option true, object not readable with storage internal error, should throw invalid error", + opts: option(true), + err: storage.NewInternalError(fmt.Errorf("unexpected error")), + want: deleteWant{ + checkErr: wantErrContains(unsafeDeleteNotAllowed), + }, + }, + { + name: "option true, object not readable with corrupt object error, unsafe-delete should trigger", + opts: option(true), + err: storage.NewCorruptObjError("foo", fmt.Errorf("object not decodable")), + want: deleteWant{ + deleted: true, + checkErr: wantNoError, + }, + invoked: 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer destroyFunc() + + object := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: example.PodSpec{NodeName: "machine"}, + } + _, err := registry.Create(ctx, object, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Unexpected error from Create: %v", err) + } + + // wrap the storage so it returns the expected error + cs := &corruptStorage{ + Interface: registry.Storage.Storage, + err: test.err, + } + registry.Storage.Storage = cs + deleter := NewCorruptObjectDeleter(registry) + + _, deleted, err := deleter.Delete(ctx, "foo", rest.ValidateAllObjectFunc, test.opts) + + got := result{deleted: deleted, err: err} + test.want.verify(t, got) + if want, got := test.invoked, cs.unsafeDeleteInvoked; want != got { + t.Errorf("Expected unsafe-delete to be invoked %d time(s), but got: %d", want, got) + } + }) + } +} + +func TestUnsafeDeleteWithCorruptObject(t *testing.T) { + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer destroyFunc() + + object := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: example.PodSpec{NodeName: "machine"}, + } + // a) prerequisite: try deleting the object, we expect a not found error + _, _, err := registry.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, nil) + if !errors.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } + + // b) create the target object + _, err = registry.Create(ctx, object, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // c) wrap the storage to return corrupt object error + cs := &corruptStorage{ + Interface: registry.Storage.Storage, + err: storage.NewCorruptObjError("key", fmt.Errorf("untransformable")), + } + registry.Storage.Storage = cs + + got := result{} + // d) try deleting the traget object + _, got.deleted, got.err = registry.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, nil) + want := deleteWant{checkErr: errors.IsInternalError} + want.verify(t, got) + + // e) set up an unsafe-deleter + deleter := NewCorruptObjectDeleter(registry) + + // f) try to delete the object, but don't set the delete option just yet + _, got.deleted, got.err = deleter.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, nil) + want.verify(t, got) + + // g) this time, set the delete option to ignore store read error + _, got.deleted, got.err = deleter.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{ + IgnoreStoreReadErrorWithClusterBreakingPotential: ptr.To[bool](true), + }) + want = deleteWant{ + deleted: true, + checkErr: wantNoError, + } + want.verify(t, got) + if want, got := 1, cs.unsafeDeleteInvoked; want != got { + t.Errorf("Expected unsafe-delete to be invoked %d time(s), but got: %d", want, got) + } +} + +func TestUnsafeDeleteWithUnexpectedError(t *testing.T) { + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + // TODO: inject a corrupt transformer + destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer destroyFunc() + + object := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: example.PodSpec{NodeName: "machine"}, + } + // a) create the target object + _, err := registry.Create(ctx, object, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // b) wrap the storage to return corrupt object error + cs := &corruptStorage{ + Interface: registry.Storage.Storage, + err: storage.NewInternalError(fmt.Errorf("unexpected error")), + } + registry.Storage.Storage = cs + + // c) try deleting the object using normal deletion flow + got := result{} + _, got.deleted, got.err = registry.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, nil) + want := deleteWant{checkErr: errors.IsInternalError} + want.verify(t, got) + + // d) set up a corrupt object deleter for the registry + deleter := NewCorruptObjectDeleter(registry) + + // e) try deleting with unsafe-delete + _, got.deleted, got.err = deleter.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{ + IgnoreStoreReadErrorWithClusterBreakingPotential: ptr.To[bool](true), + }) + want = deleteWant{ + checkErr: wantErrContains("is exclusively used to delete corrupt object(s), try again by removing this option"), + } + want.verify(t, got) + if want, got := 0, cs.unsafeDeleteInvoked; want != got { + t.Errorf("Expected unsafe-delete to be invoked %d time(s), but got: %d", want, got) + } +} + +type corruptStorage struct { + storage.Interface + err error + unsafeDeleteInvoked int +} + +func (s *corruptStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + if s.err != nil { + return s.err + } + return s.Interface.Get(ctx, key, opts, objPtr) +} + +func (s *corruptStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { + if opts.IgnoreStoreReadError { + s.unsafeDeleteInvoked++ + } + return s.Interface.Delete(ctx, key, out, preconditions, deleteValidation, cachedExistingObject, opts) +} diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 90602ad0d4e..ba64fc367be 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -234,6 +234,18 @@ type Store struct { // If set, DestroyFunc has to be implemented in thread-safe way and // be prepared for being called more than once. DestroyFunc func() + + // corruptObjDeleter implements unsafe deletion flow to enable deletion + // of corrupt object(s), it makes an attempt to perform a normal + // deletion flow first, and if the normal deletion flow fails with a + // corrupt object error then it proceeds with the unsafe deletion + // of the object from the storage. + // NOTE: it skips precondition checks, finalizer constraints, and any + // after delete hook defined in 'AfterDelete' of the registry. + // WARNING: This may break the cluster if the resource has + // dependencies. Use when the cluster is broken, and there is no + // other viable option to repair the cluster. + corruptObjDeleter rest.GracefulDeleter } // Note: the rest.StandardStorage interface aggregates the common REST verbs @@ -244,6 +256,8 @@ var _ GenericStore = &Store{} var _ rest.SingularNameProvider = &Store{} +var _ rest.CorruptObjectDeleterProvider = &Store{} + const ( OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again" resourceCountPollPeriodJitter = 1.2 @@ -344,6 +358,11 @@ func (e *Store) GetDeleteStrategy() rest.RESTDeleteStrategy { return e.DeleteStrategy } +// GetCorruptObjDeleter returns the unsafe corrupt object deleter +func (e *Store) GetCorruptObjDeleter() rest.GracefulDeleter { + return e.corruptObjDeleter +} + // List returns a list of items matching labels and field according to the // store's PredicateFunc. func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { @@ -1631,6 +1650,10 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { e.ReadinessCheckFunc = e.Storage.Storage.ReadinessCheck } + if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) { + e.corruptObjDeleter = NewCorruptObjectDeleter(e) + } + return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index fa023917058..f364635e892 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -407,3 +407,10 @@ type UpdateResetFieldsStrategy interface { RESTUpdateStrategy ResetFieldsStrategy } + +// CorruptObjectDeleterProvider is an interface the storage implements +// to support unsafe deletion of corrupt object(s). It returns a +// GracefulDeleter that is used to perform unsafe deletion of corrupt object(s). +type CorruptObjectDeleterProvider interface { + GetCorruptObjDeleter() GracefulDeleter +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/errors.go b/staging/src/k8s.io/apiserver/pkg/storage/errors.go index d67235a53ae..5bae365a884 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/errors.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/errors.go @@ -37,6 +37,7 @@ const ( ErrCodeInvalidObj ErrCodeUnreachable ErrCodeTimeout + ErrCodeCorruptObj ) var errCodeToMessage = map[int]string{ @@ -46,6 +47,7 @@ var errCodeToMessage = map[int]string{ ErrCodeInvalidObj: "invalid object", ErrCodeUnreachable: "server unreachable", ErrCodeTimeout: "request timeout", + ErrCodeCorruptObj: "corrupt object", } func NewKeyNotFoundError(key string, rv int64) *StorageError { @@ -82,30 +84,45 @@ func NewUnreachableError(key string, rv int64) *StorageError { func NewTimeoutError(key, msg string) *StorageError { return &StorageError{ - Code: ErrCodeTimeout, - Key: key, - AdditionalErrorMsg: msg, + Code: ErrCodeTimeout, + Key: key, + err: errors.New(msg), } } func NewInvalidObjError(key, msg string) *StorageError { return &StorageError{ - Code: ErrCodeInvalidObj, - Key: key, - AdditionalErrorMsg: msg, + Code: ErrCodeInvalidObj, + Key: key, + err: errors.New(msg), + } +} + +// NewCorruptObjError returns a new StorageError, it represents a corrupt object: +// a) object data retrieved from the storage failed to transform with the given err. +// b) the given object failed to decode with the given err +func NewCorruptObjError(key string, err error) *StorageError { + return &StorageError{ + Code: ErrCodeCorruptObj, + Key: key, + err: err, } } type StorageError struct { - Code int - Key string - ResourceVersion int64 - AdditionalErrorMsg string + Code int + Key string + ResourceVersion int64 + + // inner error + err error } +func (e *StorageError) Unwrap() error { return e.err } + func (e *StorageError) Error() string { - return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s", - errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg) + return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %v", + errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.err) } // IsNotFound returns true if and only if err is "key" not found error. @@ -138,6 +155,21 @@ func IsInvalidObj(err error) bool { return isErrCode(err, ErrCodeInvalidObj) } +// IsCorruptObject returns true if and only if: +// a) the given object data retrieved from the storage is not transformable, or +// b) the given object failed to decode properly +func IsCorruptObject(err error) bool { + if err == nil { + return false + } + var storageErr *StorageError + if !errors.As(err, &storageErr) { + return false + } + + return storageErr.Code == ErrCodeCorruptObj +} + func isErrCode(err error, code int) bool { if err == nil { return false diff --git a/staging/src/k8s.io/apiserver/pkg/storage/errors/storage.go b/staging/src/k8s.io/apiserver/pkg/storage/errors/storage.go index f6e1bac9776..758cfbec5ac 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/errors/storage.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/errors/storage.go @@ -32,6 +32,8 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level case storage.IsInternalError(err): return errors.NewInternalError(err) + case storage.IsCorruptObject(err): + return errors.NewInternalError(err) default: return err } @@ -47,6 +49,8 @@ func InterpretGetError(err error, qualifiedResource schema.GroupResource, name s return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level case storage.IsInternalError(err): return errors.NewInternalError(err) + case storage.IsCorruptObject(err): + return errors.NewInternalError(err) default: return err } @@ -62,6 +66,8 @@ func InterpretCreateError(err error, qualifiedResource schema.GroupResource, nam return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level case storage.IsInternalError(err): return errors.NewInternalError(err) + case storage.IsCorruptObject(err): + return errors.NewInternalError(err) default: return err } @@ -79,6 +85,8 @@ func InterpretUpdateError(err error, qualifiedResource schema.GroupResource, nam return errors.NewNotFound(qualifiedResource, name) case storage.IsInternalError(err): return errors.NewInternalError(err) + case storage.IsCorruptObject(err): + return errors.NewInternalError(err) default: return err } @@ -96,6 +104,8 @@ func InterpretDeleteError(err error, qualifiedResource schema.GroupResource, nam return errors.NewConflict(qualifiedResource, name, err) case storage.IsInternalError(err): return errors.NewInternalError(err) + case storage.IsCorruptObject(err): + return errors.NewInternalError(err) default: return err } @@ -110,6 +120,8 @@ func InterpretWatchError(err error, resource schema.GroupResource, name string) return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs) case storage.IsInternalError(err): return errors.NewInternalError(err) + case storage.IsCorruptObject(err): + return errors.NewInternalError(err) default: return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/corrupt_obj_deleter.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/corrupt_obj_deleter.go new file mode 100644 index 00000000000..f94c56eaac0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/corrupt_obj_deleter.go @@ -0,0 +1,270 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "context" + "errors" + "fmt" + "net/http" + "strings" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/value" + "k8s.io/klog/v2" +) + +// NewStoreWithUnsafeCorruptObjectDeletion wraps the given store implementation +// and adds support for unsafe deletion of corrupt objects +func NewStoreWithUnsafeCorruptObjectDeletion(delegate storage.Interface, gr schema.GroupResource) storage.Interface { + return &corruptObjectDeleter{ + Interface: delegate, + groupResource: gr, + } +} + +// WithCorruptObjErrorHandlingDecoder decorates the given decoder, it determines +// if the error returned by the given decoder represents a corrupt object (the +// object is undecodable), and then it wraps the error appropriately so the +// unsafe deleter can determine if the object is a candidate for unsafe deletion +func WithCorruptObjErrorHandlingDecoder(decoder Decoder) Decoder { + return &corruptObjErrorInterpretingDecoder{Decoder: decoder} +} + +// WithCorruptObjErrorHandlingTransformer decorates the given decoder, it +// determines if the error returned by the given transformer represents a +// corrupt object (the data from the storage is untransformable), and then it +// wraps the error appropriately so the unsafe deleter can determine +// if the object is a candidate for unsafe deletion +func WithCorruptObjErrorHandlingTransformer(transformer value.Transformer) value.Transformer { + return &corruptObjErrorInterpretingTransformer{Transformer: transformer} +} + +// corruptObjErrAggregatorFactory returns an error aggregator that aggregates +// corrupt object error(s) that the list operation encounters while +// retrieving objects from the storage. +// maxCount: it is the maximum number of error that will be aggregated +func corruptObjErrAggregatorFactory(maxCount int) func() ListErrorAggregator { + if maxCount <= 0 { + return defaultListErrorAggregatorFactory + } + return func() ListErrorAggregator { + return &corruptObjErrAggregator{maxCount: maxCount} + } +} + +var errTooMany = errors.New("too many errors, the list is truncated") + +// aggregate corrupt object errors from the LIST operation +type corruptObjErrAggregator struct { + errs []error + abortErr error + maxCount int +} + +func (a *corruptObjErrAggregator) Aggregate(key string, err error) bool { + if len(a.errs) >= a.maxCount { + // add a sentinel error to indicate there are more + a.errs = append(a.errs, errTooMany) + return true + } + var corruptObjErr *corruptObjectError + if errors.As(err, &corruptObjErr) { + a.errs = append(a.errs, storage.NewCorruptObjError(key, corruptObjErr)) + return false + } + + // not a corrupt object error, the list operation should abort + a.abortErr = err + return true +} + +func (a *corruptObjErrAggregator) Err() error { + switch { + case len(a.errs) == 0 && a.abortErr != nil: + return a.abortErr + case len(a.errs) > 0: + err := utilerrors.NewAggregate(a.errs) + return &aggregatedStorageError{errs: err, resourcePrefix: "list"} + default: + return nil + } +} + +// corruptObjectDeleter facilitates unsafe deletion of corrupt objects for etcd +type corruptObjectDeleter struct { + storage.Interface + groupResource schema.GroupResource +} + +func (s *corruptObjectDeleter) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error { + if err := s.Interface.Get(ctx, key, opts, out); err != nil { + var corruptObjErr *corruptObjectError + if !errors.As(err, &corruptObjErr) { + // this error does not represent a corrupt object + return err + } + // the unsafe deleter at the registry layer will check whether + // the given err represents a corrupt object in order to + // initiate the unsafe deletion flow. + return storage.NewCorruptObjError(key, corruptObjErr) + } + return nil +} + +func (s *corruptObjectDeleter) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + err := s.Interface.GetList(ctx, key, opts, listObj) + if err == nil { + return nil + } + + var aggregatedErr *aggregatedStorageError + if errors.As(err, &aggregatedErr) { + // we have aggregated a list of corrupt objects + klog.V(5).ErrorS(aggregatedErr, "corrupt objects") + return aggregatedErr.NewAPIStatusError(s.groupResource) + } + return err +} + +// corruptObjErrorInterpretingDecoder wraps the error returned by the decorated decoder +type corruptObjErrorInterpretingDecoder struct { + Decoder +} + +func (d *corruptObjErrorInterpretingDecoder) Decode(value []byte, objPtr runtime.Object, rev int64) error { + // TODO: right now any error is deemed as undecodable, in + // the future, we can apply some filter, if need be. + if err := d.Decoder.Decode(value, objPtr, rev); err != nil { + return &corruptObjectError{err: err, errType: undecodable, revision: rev} + } + return nil +} + +// decodeListItem decodes bytes value in array into object. +func (d *corruptObjErrorInterpretingDecoder) DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) { + // TODO: right now any error is deemed as undecodable, in + // the future, we can apply some filter, if need be. + obj, err := d.Decoder.DecodeListItem(ctx, data, rev, newItemFunc) + if err != nil { + err = &corruptObjectError{err: err, errType: undecodable, revision: int64(rev)} + } + return obj, err +} + +// corruptObjErrorInterpretingTransformer wraps the error returned by the transformer +type corruptObjErrorInterpretingTransformer struct { + value.Transformer +} + +func (t *corruptObjErrorInterpretingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { + // TODO: right now any error is deemed as undecodable, in the future, we + // can apply some filter, if need be. For example, any network error + out, stale, err := t.Transformer.TransformFromStorage(ctx, data, dataCtx) + if err != nil { + err = &corruptObjectError{err: err, errType: untransformable} + } + return out, stale, err +} + +// corruptObjectError is used internally, only by the corrupt object +// deleter, this error represents a corrup object: +// a) the data from the storage failed to transform, or +// b) the data failed to decode into an object +// NOTE: this error does not have any information to identify the object +// that is corrupt, for example the storage key associated with the object +type corruptObjectError struct { + err error + errType int + revision int64 +} + +const ( + untransformable int = iota + 1 + undecodable +) + +var typeToMessage = map[int]string{ + untransformable: "data from the storage is not transformable", + undecodable: "object not decodable", +} + +func (e *corruptObjectError) Unwrap() error { return e.err } +func (e *corruptObjectError) Error() string { + return fmt.Sprintf("%s revision=%d: %v", typeToMessage[e.errType], e.revision, e.err) +} + +// aggregatedStorageError holds an aggregated list of storage.StorageError +type aggregatedStorageError struct { + resourcePrefix string + errs utilerrors.Aggregate +} + +func (e *aggregatedStorageError) Error() string { + errs := e.errs.Errors() + var b strings.Builder + fmt.Fprintf(&b, "unable to transform or decode %d objects: {\n", len(errs)) + for _, err := range errs { + fmt.Fprintf(&b, "\t%s\n", err.Error()) + } + b.WriteString("}") + return b.String() +} + +// NewAPIStatusError creates a new APIStatus object from the +// aggregated list of StorageError +func (e *aggregatedStorageError) NewAPIStatusError(qualifiedResource schema.GroupResource) *apierrors.StatusError { + var causes []metav1.StatusCause + for _, err := range e.errs.Errors() { + var storageErr *storage.StorageError + if errors.As(err, &storageErr) { + causes = append(causes, metav1.StatusCause{ + Type: metav1.CauseTypeUnexpectedServerResponse, + Field: storageErr.Key, + // TODO: do we need to expose the internal error message here? + Message: err.Error(), + }) + continue + } + if errors.Is(err, errTooMany) { + causes = append(causes, metav1.StatusCause{ + Type: metav1.CauseTypeTooMany, + Message: errTooMany.Error(), + }) + } + } + + return &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusInternalServerError, + Reason: metav1.StatusReasonStoreReadError, + Details: &metav1.StatusDetails{ + Group: qualifiedResource.Group, + Kind: qualifiedResource.Resource, + Name: e.resourcePrefix, + Causes: causes, + }, + Message: fmt.Sprintf("failed to read one or more %s from the storage", qualifiedResource.String()), + }, + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 4a4e416a577..f38b160aa15 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -83,6 +83,7 @@ type store struct { watcher *watcher leaseManager *leaseManager decoder Decoder + listErrAggrFactory func() ListErrorAggregator } func (s *store) RequestWatchProgress(ctx context.Context) error { @@ -99,9 +100,49 @@ type objState struct { stale bool } +// ListErrorAggregator aggregates the error(s) that the LIST operation +// encounters while retrieving object(s) from the storage +type ListErrorAggregator interface { + // Aggregate aggregates the given error from list operation + // key: it identifies the given object in the storage. + // err: it represents the error the list operation encountered while + // retrieving the given object from the storage. + // done: true if the aggregation is done and the list operation should + // abort, otherwise the list operation will continue + Aggregate(key string, err error) bool + + // Err returns the aggregated error + Err() error +} + +// defaultListErrorAggregatorFactory returns the default list error +// aggregator that maintains backward compatibility, which is abort +// the list operation as soon as it encounters the first error +func defaultListErrorAggregatorFactory() ListErrorAggregator { return &abortOnFirstError{} } + +// LIST aborts on the first error it encounters (backward compatible) +type abortOnFirstError struct { + err error +} + +func (a *abortOnFirstError) Aggregate(key string, err error) bool { + a.err = err + return true +} +func (a *abortOnFirstError) Err() error { return a.err } + // New returns an etcd3 implementation of storage.Interface. func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface { - return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner) + if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) { + transformer = WithCorruptObjErrorHandlingTransformer(transformer) + decoder = WithCorruptObjErrorHandlingDecoder(decoder) + } + var store storage.Interface + store = newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner) + if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) { + store = NewStoreWithUnsafeCorruptObjectDeletion(store, groupResource) + } + return store } func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store { @@ -114,6 +155,11 @@ func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc fu pathPrefix += "/" } + listErrAggrFactory := defaultListErrorAggregatorFactory + if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) { + listErrAggrFactory = corruptObjErrAggregatorFactory(100) + } + w := &watcher{ client: c.Client, codec: codec, @@ -138,6 +184,7 @@ func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc fu watcher: w, leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig), decoder: decoder, + listErrAggrFactory: listErrAggrFactory, } w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) { @@ -271,6 +318,9 @@ func (s *store) Delete( } skipTransformDecode := false + if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) { + skipTransformDecode = opts.IgnoreStoreReadError + } return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject, skipTransformDecode) } @@ -693,6 +743,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption metricsOp = "list" } + aggregator := s.listErrAggrFactory() for { startTime := time.Now() getResp, err = s.getList(ctx, keyPrefix, opts.Recursive, kubernetes.ListOptions{ @@ -736,7 +787,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(kv.Key)) if err != nil { - return storage.NewInternalError(fmt.Errorf("unable to transform key %q: %w", kv.Key, err)) + if done := aggregator.Aggregate(string(kv.Key), storage.NewInternalError(fmt.Errorf("unable to transform key %q: %w", kv.Key, err))); done { + return aggregator.Err() + } + continue } // Check if the request has already timed out before decode object @@ -750,7 +804,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption obj, err := s.decoder.DecodeListItem(ctx, data, uint64(kv.ModRevision), newItemFunc) if err != nil { recordDecodeError(s.groupResourceString, string(kv.Key)) - return err + if done := aggregator.Aggregate(string(kv.Key), err); done { + return aggregator.Err() + } + continue } // being unable to set the version does not prevent the object from being extracted @@ -784,6 +841,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption } } + if err := aggregator.Err(); err != nil { + return err + } + if v.IsNil() { // Ensure that we never return a nil Items pointer in the result for consistency. v.Set(reflect.MakeSlice(v.Type(), 0, 0)) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 0d6dee0b104..3932f0caee2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -317,4 +317,11 @@ type ListOptions struct { // DeleteOptions provides the options that may be provided for storage delete operations. type DeleteOptions struct { + // IgnoreStoreReadError, if enabled, will ignore store read error + // such as transformation or decode failure and go ahead with the + // deletion of the object. + // NOTE: for normal deletion flow it should always be false, it may be + // enabled by the caller only to facilitate unsafe deletion of corrupt + // object which otherwise can not be deleted using the normal flow + IgnoreStoreReadError bool } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 3bb248674df..49aeaec2bee 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -464,7 +464,8 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu versioner := storage.APIObjectVersioner{} decoder := etcd3.NewDefaultDecoder(c.Codec, versioner) - return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner), destroyFunc, nil + store := etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner) + return store, destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/test/featuregates_linter/test_data/versioned_feature_list.yaml b/test/featuregates_linter/test_data/versioned_feature_list.yaml index a829c62413d..8d0ae633953 100644 --- a/test/featuregates_linter/test_data/versioned_feature_list.yaml +++ b/test/featuregates_linter/test_data/versioned_feature_list.yaml @@ -54,6 +54,12 @@ lockToDefault: true preRelease: Deprecated version: "1.32" +- name: AllowUnsafeMalformedObjectDeletion + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "1.32" - name: AnonymousAuthConfigurableEndpoints versionedSpecs: - default: false