mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
implement unsafe deletion, and wire it
- implement unsafe deletion, and wire it - aggregate corrupt object error(s) from the storage LIST operation - extend storage error: a) add a new type ErrCodeCorruptObj to represent a corrupt object: b) add a new member 'InnerErr error' to StorageError to hold the inner error - add API status error
This commit is contained in:
parent
aff05b0bca
commit
5d4b4a160d
@ -211,6 +211,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
|
||||
{Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||
},
|
||||
|
||||
genericfeatures.AllowUnsafeMalformedObjectDeletion: {
|
||||
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
|
||||
},
|
||||
|
||||
genericfeatures.AnonymousAuthConfigurableEndpoints: {
|
||||
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
|
||||
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta},
|
||||
|
@ -54,6 +54,7 @@ var knownReasons = map[metav1.StatusReason]struct{}{
|
||||
metav1.StatusReasonGone: {},
|
||||
metav1.StatusReasonInvalid: {},
|
||||
metav1.StatusReasonServerTimeout: {},
|
||||
metav1.StatusReasonStoreReadError: {},
|
||||
metav1.StatusReasonTimeout: {},
|
||||
metav1.StatusReasonTooManyRequests: {},
|
||||
metav1.StatusReasonBadRequest: {},
|
||||
@ -775,6 +776,12 @@ func IsUnexpectedObjectError(err error) bool {
|
||||
return err != nil && (ok || errors.As(err, &uoe))
|
||||
}
|
||||
|
||||
// IsStoreReadError determines if err is due to either failure to transform the
|
||||
// data from the storage, or failure to decode the object appropriately.
|
||||
func IsStoreReadError(err error) bool {
|
||||
return ReasonForError(err) == metav1.StatusReasonStoreReadError
|
||||
}
|
||||
|
||||
// SuggestsClientDelay returns true if this error suggests a client delay as well as the
|
||||
// suggested seconds to wait, or false if the error does not imply a wait. It does not
|
||||
// address whether the error *should* be retried, since some errors (like a 3xx) may
|
||||
|
@ -931,6 +931,22 @@ const (
|
||||
// Status code 500
|
||||
StatusReasonServerTimeout StatusReason = "ServerTimeout"
|
||||
|
||||
// StatusReasonStoreReadError means that the server encountered an error while
|
||||
// retrieving resources from the backend object store.
|
||||
// This may be due to backend database error, or because processing of the read
|
||||
// resource failed.
|
||||
// Details:
|
||||
// "kind" string - the kind attribute of the resource being acted on.
|
||||
// "name" string - the prefix where the reading error(s) occurred
|
||||
// "causes" []StatusCause
|
||||
// - (optional):
|
||||
// - "type" CauseType - CauseTypeUnexpectedServerResponse
|
||||
// - "message" string - the error message from the store backend
|
||||
// - "field" string - the full path with the key of the resource that failed reading
|
||||
//
|
||||
// Status code 500
|
||||
StatusReasonStoreReadError StatusReason = "StorageReadError"
|
||||
|
||||
// StatusReasonTimeout means that the request could not be completed within the given time.
|
||||
// Clients can get this response only when they specified a timeout param in the request,
|
||||
// or if the server cannot complete the operation within a reasonable amount of time.
|
||||
|
@ -118,6 +118,9 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
|
||||
}
|
||||
}
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) && options != nil {
|
||||
options.IgnoreStoreReadErrorWithClusterBreakingPotential = nil
|
||||
}
|
||||
if errs := validation.ValidateDeleteOptions(options); len(errs) > 0 {
|
||||
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", errs)
|
||||
scope.err(err, w, req)
|
||||
@ -125,6 +128,22 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
|
||||
}
|
||||
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("DeleteOptions"))
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
|
||||
if options != nil && ptr.Deref(options.IgnoreStoreReadErrorWithClusterBreakingPotential, false) {
|
||||
// let's make sure that the audit will reflect that this delete request
|
||||
// was tried with ignoreStoreReadErrorWithClusterBreakingPotential enabled
|
||||
audit.AddAuditAnnotation(ctx, "apiserver.k8s.io/unsafe-delete-ignore-read-error", "")
|
||||
|
||||
p, ok := r.(rest.CorruptObjectDeleterProvider)
|
||||
if !ok || p.GetCorruptObjDeleter() == nil {
|
||||
// this is a developer error
|
||||
scope.err(errors.NewInternalError(fmt.Errorf("no unsafe deleter provided, can not honor ignoreStoreReadErrorWithClusterBreakingPotential")), w, req)
|
||||
return
|
||||
}
|
||||
r = p.GetCorruptObjDeleter()
|
||||
}
|
||||
}
|
||||
|
||||
span.AddEvent("About to delete object from database")
|
||||
wasDeleted := true
|
||||
userInfo, _ := request.UserFrom(ctx)
|
||||
@ -262,19 +281,24 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
|
||||
}
|
||||
}
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) && options != nil {
|
||||
options.IgnoreStoreReadErrorWithClusterBreakingPotential = nil
|
||||
}
|
||||
if errs := validation.ValidateDeleteOptions(options); len(errs) > 0 {
|
||||
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", errs)
|
||||
scope.err(err, w, req)
|
||||
return
|
||||
}
|
||||
|
||||
if options != nil && ptr.Deref(options.IgnoreStoreReadErrorWithClusterBreakingPotential, true) {
|
||||
fieldErrList := field.ErrorList{
|
||||
field.Invalid(field.NewPath("ignoreStoreReadErrorWithClusterBreakingPotential"), true, "is not allowed with DELETECOLLECTION, try again after removing the option"),
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
|
||||
if options != nil && ptr.Deref(options.IgnoreStoreReadErrorWithClusterBreakingPotential, true) {
|
||||
fieldErrList := field.ErrorList{
|
||||
field.Invalid(field.NewPath("ignoreStoreReadErrorWithClusterBreakingPotential"), true, "is not allowed with DELETECOLLECTION, try again after removing the option"),
|
||||
}
|
||||
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", fieldErrList)
|
||||
scope.err(err, w, req)
|
||||
return
|
||||
}
|
||||
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", fieldErrList)
|
||||
scope.err(err, w, req)
|
||||
return
|
||||
}
|
||||
|
||||
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("DeleteOptions"))
|
||||
|
@ -54,6 +54,15 @@ const (
|
||||
// Allows us to enable anonymous auth for only certain apiserver endpoints.
|
||||
AnonymousAuthConfigurableEndpoints featuregate.Feature = "AnonymousAuthConfigurableEndpoints"
|
||||
|
||||
// owner: @stlaz @tkashem @dgrisonnet
|
||||
// kep: https://kep.k8s.io/3926
|
||||
//
|
||||
// Enables the cluster admin to identify resources that fail to
|
||||
// decrypt or fail to be decoded into an object, and introduces
|
||||
// a new delete option to allow deletion of such corrupt
|
||||
// resources using the Kubernetes API only.
|
||||
AllowUnsafeMalformedObjectDeletion featuregate.Feature = "AllowUnsafeMalformedObjectDeletion"
|
||||
|
||||
// owner: @smarterclayton
|
||||
// stable: 1.29
|
||||
//
|
||||
@ -264,6 +273,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
|
||||
{Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||
},
|
||||
|
||||
AllowUnsafeMalformedObjectDeletion: {
|
||||
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
|
||||
},
|
||||
|
||||
AnonymousAuthConfigurableEndpoints: {
|
||||
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
|
||||
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta},
|
||||
|
@ -0,0 +1,122 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
storeerr "k8s.io/apiserver/pkg/storage/errors"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
// the corrupt object deleter has the same interface as rest.GracefulDeleter
|
||||
var _ rest.GracefulDeleter = &corruptObjectDeleter{}
|
||||
|
||||
// NewCorruptObjectDeleter returns a deleter that can perform unsafe deletion
|
||||
// of corrupt objects, it makes an attempt to perform a normal deletion flow
|
||||
// first, and if the normal deletion flow fails with a corrupt object error
|
||||
// then it performs the unsafe delete of the object.
|
||||
//
|
||||
// NOTE: it skips precondition checks, finalizer constraints, and any
|
||||
// post deletion hook defined in 'AfterDelete' of the registry.
|
||||
//
|
||||
// WARNING: This may break the cluster if the resource being deleted has dependencies.
|
||||
func NewCorruptObjectDeleter(store *Store) rest.GracefulDeleter {
|
||||
return &corruptObjectDeleter{store: store}
|
||||
}
|
||||
|
||||
// corruptObjectDeleter implements unsafe object deletion flow
|
||||
type corruptObjectDeleter struct {
|
||||
store *Store
|
||||
}
|
||||
|
||||
// Delete performs an unsafe deletion of the given resource from the storage.
|
||||
//
|
||||
// NOTE: This function should NEVER be used for any normal deletion
|
||||
// flow, it is exclusively used when the user enables
|
||||
// 'IgnoreStoreReadErrorWithClusterBreakingPotential' in the delete options.
|
||||
func (d *corruptObjectDeleter) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, opts *metav1.DeleteOptions) (runtime.Object, bool, error) {
|
||||
if opts == nil || !ptr.Deref[bool](opts.IgnoreStoreReadErrorWithClusterBreakingPotential, false) {
|
||||
// this is a developer error, we should never be here, since the unsafe
|
||||
// deleter is wired in the rest layer only when the option is enabled
|
||||
return nil, false, apierrors.NewInternalError(errors.New("initialization error, expected normal deletion flow to be used"))
|
||||
}
|
||||
|
||||
key, err := d.store.KeyFunc(ctx, name)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
obj := d.store.NewFunc()
|
||||
qualifiedResource := d.store.qualifiedResourceFromContext(ctx)
|
||||
// use the storage implementation directly, bypass the dryRun layer
|
||||
storageBackend := d.store.Storage.Storage
|
||||
// we leave ResourceVersion as empty in the GetOptions so the
|
||||
// object is retrieved from the underlying storage directly
|
||||
err = storageBackend.Get(ctx, key, storage.GetOptions{}, obj)
|
||||
if err == nil || !storage.IsCorruptObject(err) {
|
||||
// TODO: The Invalid error should have a field for Resource.
|
||||
// After that field is added, we should fill the Resource and
|
||||
// leave the Kind field empty. See the discussion in #18526.
|
||||
qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}
|
||||
fieldErrList := field.ErrorList{
|
||||
field.Invalid(field.NewPath("ignoreStoreReadErrorWithClusterBreakingPotential"), true, "is exclusively used to delete corrupt object(s), try again by removing this option"),
|
||||
}
|
||||
return nil, false, apierrors.NewInvalid(qualifiedKind, name, fieldErrList)
|
||||
}
|
||||
|
||||
// try normal deletion anyway, it is expected to fail
|
||||
obj, deleted, err := d.store.Delete(ctx, name, deleteValidation, opts)
|
||||
if err == nil {
|
||||
return obj, deleted, err
|
||||
}
|
||||
// TODO: unfortunately we can't do storage.IsCorruptObject(err),
|
||||
// conversion to API error drops the inner error chain
|
||||
if !strings.Contains(err.Error(), "corrupt object") {
|
||||
return obj, deleted, err
|
||||
}
|
||||
|
||||
// TODO: at this instant, some actor may have a) managed to recreate this
|
||||
// object by doing a delete+create, or b) the underlying error has resolved
|
||||
// since the last time we checked, and the object is readable now.
|
||||
klog.FromContext(ctx).V(1).Info("Going to perform unsafe object deletion", "object", klog.KRef(genericapirequest.NamespaceValue(ctx), name))
|
||||
out := d.store.NewFunc()
|
||||
storageOpts := storage.DeleteOptions{IgnoreStoreReadError: true}
|
||||
// dropping preconditions, and keeping the admission
|
||||
if err := storageBackend.Delete(ctx, key, out, nil, storage.ValidateObjectFunc(deleteValidation), nil, storageOpts); err != nil {
|
||||
if storage.IsNotFound(err) {
|
||||
// the DELETE succeeded, but we don't have the object since it's
|
||||
// not retrievable from the storage, so we send a nil object
|
||||
return nil, false, nil
|
||||
}
|
||||
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
|
||||
}
|
||||
// the DELETE succeeded, but we don't have the object sine it's
|
||||
// not retrievable from the storage, so we send a nil objct
|
||||
return nil, true, nil
|
||||
}
|
@ -0,0 +1,288 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
type result struct {
|
||||
deleted bool
|
||||
err error
|
||||
}
|
||||
|
||||
type deleteWant struct {
|
||||
deleted bool
|
||||
checkErr func(err error) bool
|
||||
}
|
||||
|
||||
var (
|
||||
wantNoError = func(err error) bool { return err == nil }
|
||||
wantErrContains = func(shouldContain string) func(error) bool {
|
||||
return func(err error) bool {
|
||||
return err != nil && strings.Contains(err.Error(), shouldContain)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
func (w deleteWant) verify(t *testing.T, got result) {
|
||||
t.Helper()
|
||||
|
||||
if !w.checkErr(got.err) {
|
||||
t.Errorf("Unexpected failure with the deletion operation, got: %v", got.err)
|
||||
}
|
||||
if w.deleted != got.deleted {
|
||||
t.Errorf("Expected deleted to be: %t, but got: %t", w.deleted, got.deleted)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestUnsafeDeletePrecondition(t *testing.T) {
|
||||
option := func(enabled bool) *metav1.DeleteOptions {
|
||||
return &metav1.DeleteOptions{
|
||||
IgnoreStoreReadErrorWithClusterBreakingPotential: ptr.To[bool](enabled),
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
unsafeDeleteNotAllowed = "ignoreStoreReadErrorWithClusterBreakingPotential: Invalid value: true: is exclusively used to delete corrupt object(s), try again by removing this option"
|
||||
internalErr = "Internal error occurred: initialization error, expected normal deletion flow to be used"
|
||||
)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
err error
|
||||
opts *metav1.DeleteOptions
|
||||
invoked int
|
||||
want deleteWant
|
||||
}{
|
||||
{
|
||||
name: "option nil, should throw internal error",
|
||||
opts: nil,
|
||||
want: deleteWant{checkErr: wantErrContains(internalErr)},
|
||||
},
|
||||
{
|
||||
name: "option empty, should throw internal error",
|
||||
opts: &metav1.DeleteOptions{},
|
||||
want: deleteWant{checkErr: wantErrContains(internalErr)},
|
||||
},
|
||||
{
|
||||
name: "option false, should throw internal error",
|
||||
opts: option(false),
|
||||
want: deleteWant{checkErr: wantErrContains(internalErr)},
|
||||
},
|
||||
{
|
||||
name: "option true, object readable, should throw invalid error",
|
||||
opts: option(true),
|
||||
want: deleteWant{
|
||||
checkErr: wantErrContains(unsafeDeleteNotAllowed),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "option true, object not readable with unexpected error, should throw invalid error",
|
||||
opts: option(true),
|
||||
err: fmt.Errorf("unexpected error"),
|
||||
want: deleteWant{
|
||||
checkErr: wantErrContains(unsafeDeleteNotAllowed),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "option true, object not readable with storage internal error, should throw invalid error",
|
||||
opts: option(true),
|
||||
err: storage.NewInternalError(fmt.Errorf("unexpected error")),
|
||||
want: deleteWant{
|
||||
checkErr: wantErrContains(unsafeDeleteNotAllowed),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "option true, object not readable with corrupt object error, unsafe-delete should trigger",
|
||||
opts: option(true),
|
||||
err: storage.NewCorruptObjError("foo", fmt.Errorf("object not decodable")),
|
||||
want: deleteWant{
|
||||
deleted: true,
|
||||
checkErr: wantNoError,
|
||||
},
|
||||
invoked: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||
defer destroyFunc()
|
||||
|
||||
object := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
||||
Spec: example.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
_, err := registry.Create(ctx, object, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error from Create: %v", err)
|
||||
}
|
||||
|
||||
// wrap the storage so it returns the expected error
|
||||
cs := &corruptStorage{
|
||||
Interface: registry.Storage.Storage,
|
||||
err: test.err,
|
||||
}
|
||||
registry.Storage.Storage = cs
|
||||
deleter := NewCorruptObjectDeleter(registry)
|
||||
|
||||
_, deleted, err := deleter.Delete(ctx, "foo", rest.ValidateAllObjectFunc, test.opts)
|
||||
|
||||
got := result{deleted: deleted, err: err}
|
||||
test.want.verify(t, got)
|
||||
if want, got := test.invoked, cs.unsafeDeleteInvoked; want != got {
|
||||
t.Errorf("Expected unsafe-delete to be invoked %d time(s), but got: %d", want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsafeDeleteWithCorruptObject(t *testing.T) {
|
||||
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||
defer destroyFunc()
|
||||
|
||||
object := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
||||
Spec: example.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
// a) prerequisite: try deleting the object, we expect a not found error
|
||||
_, _, err := registry.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, nil)
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// b) create the target object
|
||||
_, err = registry.Create(ctx, object, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// c) wrap the storage to return corrupt object error
|
||||
cs := &corruptStorage{
|
||||
Interface: registry.Storage.Storage,
|
||||
err: storage.NewCorruptObjError("key", fmt.Errorf("untransformable")),
|
||||
}
|
||||
registry.Storage.Storage = cs
|
||||
|
||||
got := result{}
|
||||
// d) try deleting the traget object
|
||||
_, got.deleted, got.err = registry.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, nil)
|
||||
want := deleteWant{checkErr: errors.IsInternalError}
|
||||
want.verify(t, got)
|
||||
|
||||
// e) set up an unsafe-deleter
|
||||
deleter := NewCorruptObjectDeleter(registry)
|
||||
|
||||
// f) try to delete the object, but don't set the delete option just yet
|
||||
_, got.deleted, got.err = deleter.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, nil)
|
||||
want.verify(t, got)
|
||||
|
||||
// g) this time, set the delete option to ignore store read error
|
||||
_, got.deleted, got.err = deleter.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{
|
||||
IgnoreStoreReadErrorWithClusterBreakingPotential: ptr.To[bool](true),
|
||||
})
|
||||
want = deleteWant{
|
||||
deleted: true,
|
||||
checkErr: wantNoError,
|
||||
}
|
||||
want.verify(t, got)
|
||||
if want, got := 1, cs.unsafeDeleteInvoked; want != got {
|
||||
t.Errorf("Expected unsafe-delete to be invoked %d time(s), but got: %d", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsafeDeleteWithUnexpectedError(t *testing.T) {
|
||||
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||
// TODO: inject a corrupt transformer
|
||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||
defer destroyFunc()
|
||||
|
||||
object := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
||||
Spec: example.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
// a) create the target object
|
||||
_, err := registry.Create(ctx, object, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// b) wrap the storage to return corrupt object error
|
||||
cs := &corruptStorage{
|
||||
Interface: registry.Storage.Storage,
|
||||
err: storage.NewInternalError(fmt.Errorf("unexpected error")),
|
||||
}
|
||||
registry.Storage.Storage = cs
|
||||
|
||||
// c) try deleting the object using normal deletion flow
|
||||
got := result{}
|
||||
_, got.deleted, got.err = registry.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, nil)
|
||||
want := deleteWant{checkErr: errors.IsInternalError}
|
||||
want.verify(t, got)
|
||||
|
||||
// d) set up a corrupt object deleter for the registry
|
||||
deleter := NewCorruptObjectDeleter(registry)
|
||||
|
||||
// e) try deleting with unsafe-delete
|
||||
_, got.deleted, got.err = deleter.Delete(ctx, object.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{
|
||||
IgnoreStoreReadErrorWithClusterBreakingPotential: ptr.To[bool](true),
|
||||
})
|
||||
want = deleteWant{
|
||||
checkErr: wantErrContains("is exclusively used to delete corrupt object(s), try again by removing this option"),
|
||||
}
|
||||
want.verify(t, got)
|
||||
if want, got := 0, cs.unsafeDeleteInvoked; want != got {
|
||||
t.Errorf("Expected unsafe-delete to be invoked %d time(s), but got: %d", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
type corruptStorage struct {
|
||||
storage.Interface
|
||||
err error
|
||||
unsafeDeleteInvoked int
|
||||
}
|
||||
|
||||
func (s *corruptStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
|
||||
if s.err != nil {
|
||||
return s.err
|
||||
}
|
||||
return s.Interface.Get(ctx, key, opts, objPtr)
|
||||
}
|
||||
|
||||
func (s *corruptStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
|
||||
if opts.IgnoreStoreReadError {
|
||||
s.unsafeDeleteInvoked++
|
||||
}
|
||||
return s.Interface.Delete(ctx, key, out, preconditions, deleteValidation, cachedExistingObject, opts)
|
||||
}
|
@ -234,6 +234,18 @@ type Store struct {
|
||||
// If set, DestroyFunc has to be implemented in thread-safe way and
|
||||
// be prepared for being called more than once.
|
||||
DestroyFunc func()
|
||||
|
||||
// corruptObjDeleter implements unsafe deletion flow to enable deletion
|
||||
// of corrupt object(s), it makes an attempt to perform a normal
|
||||
// deletion flow first, and if the normal deletion flow fails with a
|
||||
// corrupt object error then it proceeds with the unsafe deletion
|
||||
// of the object from the storage.
|
||||
// NOTE: it skips precondition checks, finalizer constraints, and any
|
||||
// after delete hook defined in 'AfterDelete' of the registry.
|
||||
// WARNING: This may break the cluster if the resource has
|
||||
// dependencies. Use when the cluster is broken, and there is no
|
||||
// other viable option to repair the cluster.
|
||||
corruptObjDeleter rest.GracefulDeleter
|
||||
}
|
||||
|
||||
// Note: the rest.StandardStorage interface aggregates the common REST verbs
|
||||
@ -244,6 +256,8 @@ var _ GenericStore = &Store{}
|
||||
|
||||
var _ rest.SingularNameProvider = &Store{}
|
||||
|
||||
var _ rest.CorruptObjectDeleterProvider = &Store{}
|
||||
|
||||
const (
|
||||
OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
|
||||
resourceCountPollPeriodJitter = 1.2
|
||||
@ -344,6 +358,11 @@ func (e *Store) GetDeleteStrategy() rest.RESTDeleteStrategy {
|
||||
return e.DeleteStrategy
|
||||
}
|
||||
|
||||
// GetCorruptObjDeleter returns the unsafe corrupt object deleter
|
||||
func (e *Store) GetCorruptObjDeleter() rest.GracefulDeleter {
|
||||
return e.corruptObjDeleter
|
||||
}
|
||||
|
||||
// List returns a list of items matching labels and field according to the
|
||||
// store's PredicateFunc.
|
||||
func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
|
||||
@ -1631,6 +1650,10 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
||||
e.ReadinessCheckFunc = e.Storage.Storage.ReadinessCheck
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
|
||||
e.corruptObjDeleter = NewCorruptObjectDeleter(e)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -407,3 +407,10 @@ type UpdateResetFieldsStrategy interface {
|
||||
RESTUpdateStrategy
|
||||
ResetFieldsStrategy
|
||||
}
|
||||
|
||||
// CorruptObjectDeleterProvider is an interface the storage implements
|
||||
// to support unsafe deletion of corrupt object(s). It returns a
|
||||
// GracefulDeleter that is used to perform unsafe deletion of corrupt object(s).
|
||||
type CorruptObjectDeleterProvider interface {
|
||||
GetCorruptObjDeleter() GracefulDeleter
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ const (
|
||||
ErrCodeInvalidObj
|
||||
ErrCodeUnreachable
|
||||
ErrCodeTimeout
|
||||
ErrCodeCorruptObj
|
||||
)
|
||||
|
||||
var errCodeToMessage = map[int]string{
|
||||
@ -46,6 +47,7 @@ var errCodeToMessage = map[int]string{
|
||||
ErrCodeInvalidObj: "invalid object",
|
||||
ErrCodeUnreachable: "server unreachable",
|
||||
ErrCodeTimeout: "request timeout",
|
||||
ErrCodeCorruptObj: "corrupt object",
|
||||
}
|
||||
|
||||
func NewKeyNotFoundError(key string, rv int64) *StorageError {
|
||||
@ -82,30 +84,45 @@ func NewUnreachableError(key string, rv int64) *StorageError {
|
||||
|
||||
func NewTimeoutError(key, msg string) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeTimeout,
|
||||
Key: key,
|
||||
AdditionalErrorMsg: msg,
|
||||
Code: ErrCodeTimeout,
|
||||
Key: key,
|
||||
err: errors.New(msg),
|
||||
}
|
||||
}
|
||||
|
||||
func NewInvalidObjError(key, msg string) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeInvalidObj,
|
||||
Key: key,
|
||||
AdditionalErrorMsg: msg,
|
||||
Code: ErrCodeInvalidObj,
|
||||
Key: key,
|
||||
err: errors.New(msg),
|
||||
}
|
||||
}
|
||||
|
||||
// NewCorruptObjError returns a new StorageError, it represents a corrupt object:
|
||||
// a) object data retrieved from the storage failed to transform with the given err.
|
||||
// b) the given object failed to decode with the given err
|
||||
func NewCorruptObjError(key string, err error) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeCorruptObj,
|
||||
Key: key,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
type StorageError struct {
|
||||
Code int
|
||||
Key string
|
||||
ResourceVersion int64
|
||||
AdditionalErrorMsg string
|
||||
Code int
|
||||
Key string
|
||||
ResourceVersion int64
|
||||
|
||||
// inner error
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *StorageError) Unwrap() error { return e.err }
|
||||
|
||||
func (e *StorageError) Error() string {
|
||||
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s",
|
||||
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg)
|
||||
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %v",
|
||||
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.err)
|
||||
}
|
||||
|
||||
// IsNotFound returns true if and only if err is "key" not found error.
|
||||
@ -138,6 +155,21 @@ func IsInvalidObj(err error) bool {
|
||||
return isErrCode(err, ErrCodeInvalidObj)
|
||||
}
|
||||
|
||||
// IsCorruptObject returns true if and only if:
|
||||
// a) the given object data retrieved from the storage is not transformable, or
|
||||
// b) the given object failed to decode properly
|
||||
func IsCorruptObject(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
var storageErr *StorageError
|
||||
if !errors.As(err, &storageErr) {
|
||||
return false
|
||||
}
|
||||
|
||||
return storageErr.Code == ErrCodeCorruptObj
|
||||
}
|
||||
|
||||
func isErrCode(err error, code int) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
|
@ -32,6 +32,8 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error
|
||||
return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
|
||||
case storage.IsInternalError(err):
|
||||
return errors.NewInternalError(err)
|
||||
case storage.IsCorruptObject(err):
|
||||
return errors.NewInternalError(err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
@ -47,6 +49,8 @@ func InterpretGetError(err error, qualifiedResource schema.GroupResource, name s
|
||||
return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level
|
||||
case storage.IsInternalError(err):
|
||||
return errors.NewInternalError(err)
|
||||
case storage.IsCorruptObject(err):
|
||||
return errors.NewInternalError(err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
@ -62,6 +66,8 @@ func InterpretCreateError(err error, qualifiedResource schema.GroupResource, nam
|
||||
return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level
|
||||
case storage.IsInternalError(err):
|
||||
return errors.NewInternalError(err)
|
||||
case storage.IsCorruptObject(err):
|
||||
return errors.NewInternalError(err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
@ -79,6 +85,8 @@ func InterpretUpdateError(err error, qualifiedResource schema.GroupResource, nam
|
||||
return errors.NewNotFound(qualifiedResource, name)
|
||||
case storage.IsInternalError(err):
|
||||
return errors.NewInternalError(err)
|
||||
case storage.IsCorruptObject(err):
|
||||
return errors.NewInternalError(err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
@ -96,6 +104,8 @@ func InterpretDeleteError(err error, qualifiedResource schema.GroupResource, nam
|
||||
return errors.NewConflict(qualifiedResource, name, err)
|
||||
case storage.IsInternalError(err):
|
||||
return errors.NewInternalError(err)
|
||||
case storage.IsCorruptObject(err):
|
||||
return errors.NewInternalError(err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
@ -110,6 +120,8 @@ func InterpretWatchError(err error, resource schema.GroupResource, name string)
|
||||
return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs)
|
||||
case storage.IsInternalError(err):
|
||||
return errors.NewInternalError(err)
|
||||
case storage.IsCorruptObject(err):
|
||||
return errors.NewInternalError(err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
@ -0,0 +1,270 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// NewStoreWithUnsafeCorruptObjectDeletion wraps the given store implementation
|
||||
// and adds support for unsafe deletion of corrupt objects
|
||||
func NewStoreWithUnsafeCorruptObjectDeletion(delegate storage.Interface, gr schema.GroupResource) storage.Interface {
|
||||
return &corruptObjectDeleter{
|
||||
Interface: delegate,
|
||||
groupResource: gr,
|
||||
}
|
||||
}
|
||||
|
||||
// WithCorruptObjErrorHandlingDecoder decorates the given decoder, it determines
|
||||
// if the error returned by the given decoder represents a corrupt object (the
|
||||
// object is undecodable), and then it wraps the error appropriately so the
|
||||
// unsafe deleter can determine if the object is a candidate for unsafe deletion
|
||||
func WithCorruptObjErrorHandlingDecoder(decoder Decoder) Decoder {
|
||||
return &corruptObjErrorInterpretingDecoder{Decoder: decoder}
|
||||
}
|
||||
|
||||
// WithCorruptObjErrorHandlingTransformer decorates the given decoder, it
|
||||
// determines if the error returned by the given transformer represents a
|
||||
// corrupt object (the data from the storage is untransformable), and then it
|
||||
// wraps the error appropriately so the unsafe deleter can determine
|
||||
// if the object is a candidate for unsafe deletion
|
||||
func WithCorruptObjErrorHandlingTransformer(transformer value.Transformer) value.Transformer {
|
||||
return &corruptObjErrorInterpretingTransformer{Transformer: transformer}
|
||||
}
|
||||
|
||||
// corruptObjErrAggregatorFactory returns an error aggregator that aggregates
|
||||
// corrupt object error(s) that the list operation encounters while
|
||||
// retrieving objects from the storage.
|
||||
// maxCount: it is the maximum number of error that will be aggregated
|
||||
func corruptObjErrAggregatorFactory(maxCount int) func() ListErrorAggregator {
|
||||
if maxCount <= 0 {
|
||||
return defaultListErrorAggregatorFactory
|
||||
}
|
||||
return func() ListErrorAggregator {
|
||||
return &corruptObjErrAggregator{maxCount: maxCount}
|
||||
}
|
||||
}
|
||||
|
||||
var errTooMany = errors.New("too many errors, the list is truncated")
|
||||
|
||||
// aggregate corrupt object errors from the LIST operation
|
||||
type corruptObjErrAggregator struct {
|
||||
errs []error
|
||||
abortErr error
|
||||
maxCount int
|
||||
}
|
||||
|
||||
func (a *corruptObjErrAggregator) Aggregate(key string, err error) bool {
|
||||
if len(a.errs) >= a.maxCount {
|
||||
// add a sentinel error to indicate there are more
|
||||
a.errs = append(a.errs, errTooMany)
|
||||
return true
|
||||
}
|
||||
var corruptObjErr *corruptObjectError
|
||||
if errors.As(err, &corruptObjErr) {
|
||||
a.errs = append(a.errs, storage.NewCorruptObjError(key, corruptObjErr))
|
||||
return false
|
||||
}
|
||||
|
||||
// not a corrupt object error, the list operation should abort
|
||||
a.abortErr = err
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *corruptObjErrAggregator) Err() error {
|
||||
switch {
|
||||
case len(a.errs) == 0 && a.abortErr != nil:
|
||||
return a.abortErr
|
||||
case len(a.errs) > 0:
|
||||
err := utilerrors.NewAggregate(a.errs)
|
||||
return &aggregatedStorageError{errs: err, resourcePrefix: "list"}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// corruptObjectDeleter facilitates unsafe deletion of corrupt objects for etcd
|
||||
type corruptObjectDeleter struct {
|
||||
storage.Interface
|
||||
groupResource schema.GroupResource
|
||||
}
|
||||
|
||||
func (s *corruptObjectDeleter) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
|
||||
if err := s.Interface.Get(ctx, key, opts, out); err != nil {
|
||||
var corruptObjErr *corruptObjectError
|
||||
if !errors.As(err, &corruptObjErr) {
|
||||
// this error does not represent a corrupt object
|
||||
return err
|
||||
}
|
||||
// the unsafe deleter at the registry layer will check whether
|
||||
// the given err represents a corrupt object in order to
|
||||
// initiate the unsafe deletion flow.
|
||||
return storage.NewCorruptObjError(key, corruptObjErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *corruptObjectDeleter) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
err := s.Interface.GetList(ctx, key, opts, listObj)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var aggregatedErr *aggregatedStorageError
|
||||
if errors.As(err, &aggregatedErr) {
|
||||
// we have aggregated a list of corrupt objects
|
||||
klog.V(5).ErrorS(aggregatedErr, "corrupt objects")
|
||||
return aggregatedErr.NewAPIStatusError(s.groupResource)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// corruptObjErrorInterpretingDecoder wraps the error returned by the decorated decoder
|
||||
type corruptObjErrorInterpretingDecoder struct {
|
||||
Decoder
|
||||
}
|
||||
|
||||
func (d *corruptObjErrorInterpretingDecoder) Decode(value []byte, objPtr runtime.Object, rev int64) error {
|
||||
// TODO: right now any error is deemed as undecodable, in
|
||||
// the future, we can apply some filter, if need be.
|
||||
if err := d.Decoder.Decode(value, objPtr, rev); err != nil {
|
||||
return &corruptObjectError{err: err, errType: undecodable, revision: rev}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// decodeListItem decodes bytes value in array into object.
|
||||
func (d *corruptObjErrorInterpretingDecoder) DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) {
|
||||
// TODO: right now any error is deemed as undecodable, in
|
||||
// the future, we can apply some filter, if need be.
|
||||
obj, err := d.Decoder.DecodeListItem(ctx, data, rev, newItemFunc)
|
||||
if err != nil {
|
||||
err = &corruptObjectError{err: err, errType: undecodable, revision: int64(rev)}
|
||||
}
|
||||
return obj, err
|
||||
}
|
||||
|
||||
// corruptObjErrorInterpretingTransformer wraps the error returned by the transformer
|
||||
type corruptObjErrorInterpretingTransformer struct {
|
||||
value.Transformer
|
||||
}
|
||||
|
||||
func (t *corruptObjErrorInterpretingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
// TODO: right now any error is deemed as undecodable, in the future, we
|
||||
// can apply some filter, if need be. For example, any network error
|
||||
out, stale, err := t.Transformer.TransformFromStorage(ctx, data, dataCtx)
|
||||
if err != nil {
|
||||
err = &corruptObjectError{err: err, errType: untransformable}
|
||||
}
|
||||
return out, stale, err
|
||||
}
|
||||
|
||||
// corruptObjectError is used internally, only by the corrupt object
|
||||
// deleter, this error represents a corrup object:
|
||||
// a) the data from the storage failed to transform, or
|
||||
// b) the data failed to decode into an object
|
||||
// NOTE: this error does not have any information to identify the object
|
||||
// that is corrupt, for example the storage key associated with the object
|
||||
type corruptObjectError struct {
|
||||
err error
|
||||
errType int
|
||||
revision int64
|
||||
}
|
||||
|
||||
const (
|
||||
untransformable int = iota + 1
|
||||
undecodable
|
||||
)
|
||||
|
||||
var typeToMessage = map[int]string{
|
||||
untransformable: "data from the storage is not transformable",
|
||||
undecodable: "object not decodable",
|
||||
}
|
||||
|
||||
func (e *corruptObjectError) Unwrap() error { return e.err }
|
||||
func (e *corruptObjectError) Error() string {
|
||||
return fmt.Sprintf("%s revision=%d: %v", typeToMessage[e.errType], e.revision, e.err)
|
||||
}
|
||||
|
||||
// aggregatedStorageError holds an aggregated list of storage.StorageError
|
||||
type aggregatedStorageError struct {
|
||||
resourcePrefix string
|
||||
errs utilerrors.Aggregate
|
||||
}
|
||||
|
||||
func (e *aggregatedStorageError) Error() string {
|
||||
errs := e.errs.Errors()
|
||||
var b strings.Builder
|
||||
fmt.Fprintf(&b, "unable to transform or decode %d objects: {\n", len(errs))
|
||||
for _, err := range errs {
|
||||
fmt.Fprintf(&b, "\t%s\n", err.Error())
|
||||
}
|
||||
b.WriteString("}")
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// NewAPIStatusError creates a new APIStatus object from the
|
||||
// aggregated list of StorageError
|
||||
func (e *aggregatedStorageError) NewAPIStatusError(qualifiedResource schema.GroupResource) *apierrors.StatusError {
|
||||
var causes []metav1.StatusCause
|
||||
for _, err := range e.errs.Errors() {
|
||||
var storageErr *storage.StorageError
|
||||
if errors.As(err, &storageErr) {
|
||||
causes = append(causes, metav1.StatusCause{
|
||||
Type: metav1.CauseTypeUnexpectedServerResponse,
|
||||
Field: storageErr.Key,
|
||||
// TODO: do we need to expose the internal error message here?
|
||||
Message: err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
if errors.Is(err, errTooMany) {
|
||||
causes = append(causes, metav1.StatusCause{
|
||||
Type: metav1.CauseTypeTooMany,
|
||||
Message: errTooMany.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return &apierrors.StatusError{
|
||||
ErrStatus: metav1.Status{
|
||||
Status: metav1.StatusFailure,
|
||||
Code: http.StatusInternalServerError,
|
||||
Reason: metav1.StatusReasonStoreReadError,
|
||||
Details: &metav1.StatusDetails{
|
||||
Group: qualifiedResource.Group,
|
||||
Kind: qualifiedResource.Resource,
|
||||
Name: e.resourcePrefix,
|
||||
Causes: causes,
|
||||
},
|
||||
Message: fmt.Sprintf("failed to read one or more %s from the storage", qualifiedResource.String()),
|
||||
},
|
||||
}
|
||||
}
|
@ -83,6 +83,7 @@ type store struct {
|
||||
watcher *watcher
|
||||
leaseManager *leaseManager
|
||||
decoder Decoder
|
||||
listErrAggrFactory func() ListErrorAggregator
|
||||
}
|
||||
|
||||
func (s *store) RequestWatchProgress(ctx context.Context) error {
|
||||
@ -99,9 +100,49 @@ type objState struct {
|
||||
stale bool
|
||||
}
|
||||
|
||||
// ListErrorAggregator aggregates the error(s) that the LIST operation
|
||||
// encounters while retrieving object(s) from the storage
|
||||
type ListErrorAggregator interface {
|
||||
// Aggregate aggregates the given error from list operation
|
||||
// key: it identifies the given object in the storage.
|
||||
// err: it represents the error the list operation encountered while
|
||||
// retrieving the given object from the storage.
|
||||
// done: true if the aggregation is done and the list operation should
|
||||
// abort, otherwise the list operation will continue
|
||||
Aggregate(key string, err error) bool
|
||||
|
||||
// Err returns the aggregated error
|
||||
Err() error
|
||||
}
|
||||
|
||||
// defaultListErrorAggregatorFactory returns the default list error
|
||||
// aggregator that maintains backward compatibility, which is abort
|
||||
// the list operation as soon as it encounters the first error
|
||||
func defaultListErrorAggregatorFactory() ListErrorAggregator { return &abortOnFirstError{} }
|
||||
|
||||
// LIST aborts on the first error it encounters (backward compatible)
|
||||
type abortOnFirstError struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (a *abortOnFirstError) Aggregate(key string, err error) bool {
|
||||
a.err = err
|
||||
return true
|
||||
}
|
||||
func (a *abortOnFirstError) Err() error { return a.err }
|
||||
|
||||
// New returns an etcd3 implementation of storage.Interface.
|
||||
func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
|
||||
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
|
||||
transformer = WithCorruptObjErrorHandlingTransformer(transformer)
|
||||
decoder = WithCorruptObjErrorHandlingDecoder(decoder)
|
||||
}
|
||||
var store storage.Interface
|
||||
store = newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
|
||||
store = NewStoreWithUnsafeCorruptObjectDeletion(store, groupResource)
|
||||
}
|
||||
return store
|
||||
}
|
||||
|
||||
func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
|
||||
@ -114,6 +155,11 @@ func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc fu
|
||||
pathPrefix += "/"
|
||||
}
|
||||
|
||||
listErrAggrFactory := defaultListErrorAggregatorFactory
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
|
||||
listErrAggrFactory = corruptObjErrAggregatorFactory(100)
|
||||
}
|
||||
|
||||
w := &watcher{
|
||||
client: c.Client,
|
||||
codec: codec,
|
||||
@ -138,6 +184,7 @@ func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc fu
|
||||
watcher: w,
|
||||
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
|
||||
decoder: decoder,
|
||||
listErrAggrFactory: listErrAggrFactory,
|
||||
}
|
||||
|
||||
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
|
||||
@ -271,6 +318,9 @@ func (s *store) Delete(
|
||||
}
|
||||
|
||||
skipTransformDecode := false
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
|
||||
skipTransformDecode = opts.IgnoreStoreReadError
|
||||
}
|
||||
return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject, skipTransformDecode)
|
||||
}
|
||||
|
||||
@ -693,6 +743,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
metricsOp = "list"
|
||||
}
|
||||
|
||||
aggregator := s.listErrAggrFactory()
|
||||
for {
|
||||
startTime := time.Now()
|
||||
getResp, err = s.getList(ctx, keyPrefix, opts.Recursive, kubernetes.ListOptions{
|
||||
@ -736,7 +787,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
|
||||
data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(kv.Key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(fmt.Errorf("unable to transform key %q: %w", kv.Key, err))
|
||||
if done := aggregator.Aggregate(string(kv.Key), storage.NewInternalError(fmt.Errorf("unable to transform key %q: %w", kv.Key, err))); done {
|
||||
return aggregator.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the request has already timed out before decode object
|
||||
@ -750,7 +804,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
obj, err := s.decoder.DecodeListItem(ctx, data, uint64(kv.ModRevision), newItemFunc)
|
||||
if err != nil {
|
||||
recordDecodeError(s.groupResourceString, string(kv.Key))
|
||||
return err
|
||||
if done := aggregator.Aggregate(string(kv.Key), err); done {
|
||||
return aggregator.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
@ -784,6 +841,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
}
|
||||
}
|
||||
|
||||
if err := aggregator.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if v.IsNil() {
|
||||
// Ensure that we never return a nil Items pointer in the result for consistency.
|
||||
v.Set(reflect.MakeSlice(v.Type(), 0, 0))
|
||||
|
@ -317,4 +317,11 @@ type ListOptions struct {
|
||||
|
||||
// DeleteOptions provides the options that may be provided for storage delete operations.
|
||||
type DeleteOptions struct {
|
||||
// IgnoreStoreReadError, if enabled, will ignore store read error
|
||||
// such as transformation or decode failure and go ahead with the
|
||||
// deletion of the object.
|
||||
// NOTE: for normal deletion flow it should always be false, it may be
|
||||
// enabled by the caller only to facilitate unsafe deletion of corrupt
|
||||
// object which otherwise can not be deleted using the normal flow
|
||||
IgnoreStoreReadError bool
|
||||
}
|
||||
|
@ -464,7 +464,8 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
|
||||
|
||||
versioner := storage.APIObjectVersioner{}
|
||||
decoder := etcd3.NewDefaultDecoder(c.Codec, versioner)
|
||||
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner), destroyFunc, nil
|
||||
store := etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner)
|
||||
return store, destroyFunc, nil
|
||||
}
|
||||
|
||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
||||
|
@ -54,6 +54,12 @@
|
||||
lockToDefault: true
|
||||
preRelease: Deprecated
|
||||
version: "1.32"
|
||||
- name: AllowUnsafeMalformedObjectDeletion
|
||||
versionedSpecs:
|
||||
- default: false
|
||||
lockToDefault: false
|
||||
preRelease: Alpha
|
||||
version: "1.32"
|
||||
- name: AnonymousAuthConfigurableEndpoints
|
||||
versionedSpecs:
|
||||
- default: false
|
||||
|
Loading…
Reference in New Issue
Block a user