mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
refactor etcd store conditional delete
This commit is contained in:
parent
a6ea7b8218
commit
fecab0713b
@ -271,13 +271,15 @@ func (s *store) Delete(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to convert output object to pointer: %v", err)
|
return fmt.Errorf("unable to convert output object to pointer: %v", err)
|
||||||
}
|
}
|
||||||
return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject)
|
|
||||||
|
skipTransformDecode := false
|
||||||
|
return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject, skipTransformDecode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) conditionalDelete(
|
func (s *store) conditionalDelete(
|
||||||
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
|
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
|
||||||
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
|
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, skipTransformDecode bool) error {
|
||||||
getCurrentState := s.getCurrentState(ctx, key, v, false)
|
getCurrentState := s.getCurrentState(ctx, key, v, false, skipTransformDecode)
|
||||||
|
|
||||||
var origState *objState
|
var origState *objState
|
||||||
var err error
|
var err error
|
||||||
@ -361,7 +363,7 @@ func (s *store) conditionalDelete(
|
|||||||
if !txnResp.Succeeded {
|
if !txnResp.Succeeded {
|
||||||
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||||
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
|
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
|
||||||
origState, err = s.getState(ctx, getResp, key, v, false)
|
origState, err = s.getState(ctx, getResp, key, v, false, skipTransformDecode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -376,11 +378,13 @@ func (s *store) conditionalDelete(
|
|||||||
if deleteResp.Header == nil {
|
if deleteResp.Header == nil {
|
||||||
return errors.New("invalid DeleteRange response - nil header")
|
return errors.New("invalid DeleteRange response - nil header")
|
||||||
}
|
}
|
||||||
|
if !skipTransformDecode {
|
||||||
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
|
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordDecodeError(s.groupResourceString, key)
|
recordDecodeError(s.groupResourceString, key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -405,7 +409,8 @@ func (s *store) GuaranteedUpdate(
|
|||||||
return fmt.Errorf("unable to convert output object to pointer: %v", err)
|
return fmt.Errorf("unable to convert output object to pointer: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound)
|
skipTransformDecode := false
|
||||||
|
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound, skipTransformDecode)
|
||||||
|
|
||||||
var origState *objState
|
var origState *objState
|
||||||
var origStateIsCurrent bool
|
var origStateIsCurrent bool
|
||||||
@ -531,7 +536,8 @@ func (s *store) GuaranteedUpdate(
|
|||||||
if !txnResp.Succeeded {
|
if !txnResp.Succeeded {
|
||||||
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||||
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey)
|
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey)
|
||||||
origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound)
|
skipTransformDecode := false
|
||||||
|
origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound, skipTransformDecode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -878,7 +884,7 @@ func (s *store) watchContext(ctx context.Context) context.Context {
|
|||||||
return clientv3.WithRequireLeader(ctx)
|
return clientv3.WithRequireLeader(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool) func() (*objState, error) {
|
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) func() (*objState, error) {
|
||||||
return func() (*objState, error) {
|
return func() (*objState, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
getResp, err := s.client.KV.Get(ctx, key)
|
getResp, err := s.client.KV.Get(ctx, key)
|
||||||
@ -886,11 +892,17 @@ func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return s.getState(ctx, getResp, key, v, ignoreNotFound)
|
return s.getState(ctx, getResp, key, v, ignoreNotFound, skipTransformDecode)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
// getState constructs a new objState from the given response from the storage.
|
||||||
|
// skipTransformDecode: if true, the function will neither transform the data
|
||||||
|
// from the storage nor decode it into an object; otherwise, data from the
|
||||||
|
// storage will be transformed and decoded.
|
||||||
|
// NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields
|
||||||
|
// of the objState will be nil, and 'stale' will be set to true.
|
||||||
|
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) {
|
||||||
state := &objState{
|
state := &objState{
|
||||||
meta: &storage.ResponseMeta{},
|
meta: &storage.ResponseMeta{},
|
||||||
}
|
}
|
||||||
@ -909,14 +921,24 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
state.rev = getResp.Kvs[0].ModRevision
|
||||||
|
state.meta.ResourceVersion = uint64(state.rev)
|
||||||
|
|
||||||
|
if skipTransformDecode {
|
||||||
|
// be explicit that we don't have the object
|
||||||
|
state.obj = nil
|
||||||
|
state.stale = true // this seems a more sane value here
|
||||||
|
return state, nil
|
||||||
|
}
|
||||||
|
|
||||||
data, stale, err := s.transformer.TransformFromStorage(ctx, getResp.Kvs[0].Value, authenticatedDataString(key))
|
data, stale, err := s.transformer.TransformFromStorage(ctx, getResp.Kvs[0].Value, authenticatedDataString(key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, storage.NewInternalError(err.Error())
|
return nil, storage.NewInternalError(err.Error())
|
||||||
}
|
}
|
||||||
state.rev = getResp.Kvs[0].ModRevision
|
|
||||||
state.meta.ResourceVersion = uint64(state.rev)
|
|
||||||
state.data = data
|
state.data = data
|
||||||
state.stale = stale
|
state.stale = stale
|
||||||
|
|
||||||
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
|
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
|
||||||
recordDecodeError(s.groupResourceString, key)
|
recordDecodeError(s.groupResourceString, key)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -108,6 +108,8 @@ type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Obj
|
|||||||
// ValidateObjectFunc is a function to act on a given object. An error may be returned
|
// ValidateObjectFunc is a function to act on a given object. An error may be returned
|
||||||
// if the hook cannot be completed. The function may NOT transform the provided
|
// if the hook cannot be completed. The function may NOT transform the provided
|
||||||
// object.
|
// object.
|
||||||
|
// NOTE: the object in obj may be nil if it cannot be read from the
|
||||||
|
// storage, due to transformation or decode error.
|
||||||
type ValidateObjectFunc func(ctx context.Context, obj runtime.Object) error
|
type ValidateObjectFunc func(ctx context.Context, obj runtime.Object) error
|
||||||
|
|
||||||
// ValidateAllObjectFunc is a "admit everything" instance of ValidateObjectFunc.
|
// ValidateAllObjectFunc is a "admit everything" instance of ValidateObjectFunc.
|
||||||
|
39
staging/src/k8s.io/apiserver/pkg/storage/interfaces_test.go
Normal file
39
staging/src/k8s.io/apiserver/pkg/storage/interfaces_test.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2024 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPreconditionsCheckWithNilObject(t *testing.T) {
|
||||||
|
p := &Preconditions{}
|
||||||
|
err := p.Check("foo", nil)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
var internalErr InternalError
|
||||||
|
if !errors.As(err, &internalErr) {
|
||||||
|
t.Fatalf("expected error to be of type: %T, but got: %#v", InternalError{}, err)
|
||||||
|
}
|
||||||
|
if want := "can't enforce preconditions"; !strings.Contains(internalErr.Error(), want) {
|
||||||
|
t.Errorf("expected error to contain %q", want)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user