mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Use current state from watchcache to avoid etcd get for deletions
This commit is contained in:
parent
c2d61896f4
commit
7bab6a9c6e
@ -433,8 +433,19 @@ func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object
|
||||
// Delete implements storage.Interface.
|
||||
func (c *Cacher) Delete(
|
||||
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
|
||||
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
|
||||
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject)
|
||||
validateDeletion storage.ValidateObjectFunc, _ runtime.Object) error {
|
||||
// Ignore the suggestion and try to pass down the current version of the object
|
||||
// read from cache.
|
||||
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
|
||||
klog.Errorf("GetByKey returned error: %v", err)
|
||||
} else if exists {
|
||||
// DeepCopy the object since we modify resource version when serializing the
|
||||
// current object.
|
||||
currObj := elem.(*storeElement).Object.DeepCopyObject()
|
||||
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj)
|
||||
}
|
||||
// If we couldn't get the object, fallback to no-suggestion.
|
||||
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil)
|
||||
}
|
||||
|
||||
// Watch implements storage.Interface.
|
||||
|
@ -199,25 +199,63 @@ func (s *store) Delete(
|
||||
func (s *store) conditionalDelete(
|
||||
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
|
||||
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||
getCurrentState := func() (*objState, error) {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.getState(getResp, key, v, false)
|
||||
}
|
||||
|
||||
var origState *objState
|
||||
var err error
|
||||
var origStateIsCurrent bool
|
||||
if cachedExistingObject != nil {
|
||||
origState, err = s.getStateFromObject(cachedExistingObject)
|
||||
} else {
|
||||
origState, err = getCurrentState()
|
||||
origStateIsCurrent = true
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
origState, err := s.getState(getResp, key, v, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if preconditions != nil {
|
||||
if err := preconditions.Check(key, origState.obj); err != nil {
|
||||
return err
|
||||
if origStateIsCurrent {
|
||||
return err
|
||||
}
|
||||
|
||||
// It's possible we're working with stale data.
|
||||
// Actually fetch
|
||||
origState, err = getCurrentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origStateIsCurrent = true
|
||||
// Retry
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := validateDeletion(ctx, origState.obj); err != nil {
|
||||
return err
|
||||
if origStateIsCurrent {
|
||||
return err
|
||||
}
|
||||
|
||||
// It's possible we're working with stale data.
|
||||
// Actually fetch
|
||||
origState, err = getCurrentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origStateIsCurrent = true
|
||||
// Retry
|
||||
continue
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
||||
@ -231,8 +269,13 @@ func (s *store) conditionalDelete(
|
||||
return err
|
||||
}
|
||||
if !txnResp.Succeeded {
|
||||
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
|
||||
origState, err = s.getState(getResp, key, v, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origStateIsCurrent = true
|
||||
continue
|
||||
}
|
||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||
@ -266,15 +309,12 @@ func (s *store) GuaranteedUpdate(
|
||||
var mustCheckData bool
|
||||
if suggestion != nil {
|
||||
origState, err = s.getStateFromObject(suggestion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = true
|
||||
} else {
|
||||
origState, err = getCurrentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("initial value restored")
|
||||
|
||||
|
@ -374,6 +374,172 @@ func TestConditionalDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// The following set of Delete tests are testing the logic of adding `suggestion`
|
||||
// as a parameter with probably value of the current state.
|
||||
// Introducing it for GuaranteedUpdate cause a number of issues, so we're addressing
|
||||
// all of those upfront by adding appropriate tests:
|
||||
// - https://github.com/kubernetes/kubernetes/pull/35415
|
||||
// [DONE] Lack of tests originally - added TestDeleteWithSuggestion.
|
||||
// - https://github.com/kubernetes/kubernetes/pull/40664
|
||||
// [DONE] Irrelevant for delete, as Delete doesn't write data (nor compare it).
|
||||
// - https://github.com/kubernetes/kubernetes/pull/47703
|
||||
// [DONE] Irrelevant for delete, because Delete doesn't persist data.
|
||||
// - https://github.com/kubernetes/kubernetes/pull/48394/
|
||||
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
||||
// - https://github.com/kubernetes/kubernetes/pull/43152
|
||||
// [DONE] Added TestDeleteWithSuggestionAndConflict
|
||||
// - https://github.com/kubernetes/kubernetes/pull/54780
|
||||
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
||||
// - https://github.com/kubernetes/kubernetes/pull/58375
|
||||
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
||||
// - https://github.com/kubernetes/kubernetes/pull/77619
|
||||
// [DONE] Added TestValidateDeletionWithSuggestion for corresponding delete checks.
|
||||
// - https://github.com/kubernetes/kubernetes/pull/78713
|
||||
// [DONE] Bug was in getState function which is shared with the new code.
|
||||
// - https://github.com/kubernetes/kubernetes/pull/78713
|
||||
// [DONE] Added TestPreconditionalDeleteWithSuggestion
|
||||
|
||||
func TestDeleteWithSuggestion(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
out := &example.Pod{}
|
||||
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||
}
|
||||
|
||||
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error on reading object: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteWithSuggestionAndConflict(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
// First update, so originalPod is outdated.
|
||||
updatedPod := &example.Pod{}
|
||||
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
pod.ObjectMeta.Labels = map[string]string{"foo": "bar"}
|
||||
return pod, nil
|
||||
}), nil); err != nil {
|
||||
t.Errorf("Unexpected failure during updated: %v", err)
|
||||
}
|
||||
|
||||
out := &example.Pod{}
|
||||
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||
}
|
||||
|
||||
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error on reading object: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
// First delete, so originalPod is outdated.
|
||||
deletedPod := &example.Pod{}
|
||||
if err := store.Delete(ctx, key, deletedPod, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||
}
|
||||
|
||||
// Now try deleting with stale object.
|
||||
out := &example.Pod{}
|
||||
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); !storage.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error during deletion: %v, expected not-found", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateDeletionWithSuggestion(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
// Check that validaing fresh object fails.
|
||||
validationError := fmt.Errorf("validation error")
|
||||
validateNothing := func(_ context.Context, _ runtime.Object) error {
|
||||
return validationError
|
||||
}
|
||||
out := &example.Pod{}
|
||||
if err := store.Delete(ctx, key, out, nil, validateNothing, originalPod); err != validationError {
|
||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||
}
|
||||
|
||||
// First update, so originalPod is outdated.
|
||||
updatedPod := &example.Pod{}
|
||||
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
pod.ObjectMeta.Labels = map[string]string{"foo": "bar"}
|
||||
return pod, nil
|
||||
}), nil); err != nil {
|
||||
t.Errorf("Unexpected failure during updated: %v", err)
|
||||
}
|
||||
|
||||
calls := 0
|
||||
validateFresh := func(_ context.Context, obj runtime.Object) error {
|
||||
calls++
|
||||
pod := obj.(*example.Pod)
|
||||
if pod.ObjectMeta.Labels == nil || pod.ObjectMeta.Labels["foo"] != "bar" {
|
||||
return fmt.Errorf("stale object")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := store.Delete(ctx, key, out, nil, validateFresh, originalPod); err != nil {
|
||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||
}
|
||||
|
||||
if calls != 2 {
|
||||
t.Errorf("validate function should have been called twice, called %d", calls)
|
||||
}
|
||||
|
||||
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error on reading object: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
// First update, so originalPod is outdated.
|
||||
updatedPod := &example.Pod{}
|
||||
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
pod.ObjectMeta.UID = "myUID"
|
||||
return pod, nil
|
||||
}), nil); err != nil {
|
||||
t.Errorf("Unexpected failure during updated: %v", err)
|
||||
}
|
||||
|
||||
prec := storage.NewUIDPreconditions("myUID")
|
||||
|
||||
out := &example.Pod{}
|
||||
if err := store.Delete(ctx, key, out, prec, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||
}
|
||||
|
||||
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error on reading object: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetToList(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
Loading…
Reference in New Issue
Block a user