mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #113369 from wojtek-t/fix_delete_resource_version
Fix setting resource version on etcd3 deletion
This commit is contained in:
commit
421213b7a1
@ -330,7 +330,15 @@ func (s *store) conditionalDelete(
|
||||
origStateIsCurrent = true
|
||||
continue
|
||||
}
|
||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||
|
||||
if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil {
|
||||
return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses))
|
||||
}
|
||||
deleteResp := txnResp.Responses[0].GetResponseDeleteRange()
|
||||
if deleteResp.Header == nil {
|
||||
return errors.New("invalid DeleteRange response - nil header")
|
||||
}
|
||||
return decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,8 +23,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -72,7 +70,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||
origCtx, store, _ := testSetup(t)
|
||||
ctx, cancel := context.WithCancel(origCtx)
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything)
|
||||
// make resutlChan and errChan blocking to ensure ordering.
|
||||
// make resultChan and errChan blocking to ensure ordering.
|
||||
w.resultChan = make(chan watch.Event)
|
||||
w.errChan = make(chan error)
|
||||
// The event flow goes like:
|
||||
@ -91,57 +89,31 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||
ctx, store, client := testSetup(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||
watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout)
|
||||
w, err := store.Watch(watchCtx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
rv, err := storage.APIObjectVersioner{}.ObjectResourceVersion(storedObj)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to parse resourceVersion on stored object: %v", err)
|
||||
}
|
||||
etcdW := client.Watch(ctx, key, clientv3.WithRev(int64(rv)))
|
||||
|
||||
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
|
||||
deletedObj := &example.Pod{}
|
||||
if err := store.Delete(ctx, key, deletedObj, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
|
||||
var e watch.Event
|
||||
watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout)
|
||||
// Verify that ResourceVersion has changed on deletion.
|
||||
if storedObj.ResourceVersion == deletedObj.ResourceVersion {
|
||||
t.Fatalf("ResourceVersion didn't changed on deletion: %s", deletedObj.ResourceVersion)
|
||||
}
|
||||
|
||||
select {
|
||||
case e = <-w.ResultChan():
|
||||
case <-watchCtx.Done():
|
||||
t.Fatalf("timed out waiting for watch event")
|
||||
}
|
||||
deletedRV, err := deletedRevision(watchCtx, etcdW)
|
||||
if err != nil {
|
||||
t.Fatalf("did not see delete event in raw watch: %v", err)
|
||||
}
|
||||
watchedDeleteObj := e.Object.(*example.Pod)
|
||||
|
||||
watchedDeleteRev, err := store.versioner.ParseResourceVersion(watchedDeleteObj.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
||||
}
|
||||
if int64(watchedDeleteRev) != deletedRV {
|
||||
t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d",
|
||||
watchedDeleteRev, deletedRV)
|
||||
}
|
||||
}
|
||||
|
||||
func deletedRevision(ctx context.Context, watch <-chan clientv3.WatchResponse) (int64, error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case wres := <-watch:
|
||||
for _, evt := range wres.Events {
|
||||
if evt.Type == mvccpb.DELETE && evt.Kv != nil {
|
||||
return evt.Kv.ModRevision, nil
|
||||
}
|
||||
}
|
||||
case event := <-w.ResultChan():
|
||||
watchedDeleteObj := event.Object.(*example.Pod)
|
||||
if e, a := deletedObj.ResourceVersion, watchedDeleteObj.ResourceVersion; e != a {
|
||||
t.Errorf("Unexpected resource version: %v, expected %v", a, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -225,14 +225,20 @@ func RunTestUnconditionalDelete(ctx context.Context, t *testing.T, store storage
|
||||
err := store.Delete(ctx, tt.key, out, nil, storage.ValidateAllObjectFunc, nil)
|
||||
if tt.expectNotFoundErr {
|
||||
if err == nil || !storage.IsNotFound(err) {
|
||||
t.Errorf("%s: expecting not found error, but get: %s", tt.name, err)
|
||||
t.Errorf("expecting not found error, but get: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod:", tt.name), tt.expectedObj, out)
|
||||
// We expect the resource version of the returned object to be
|
||||
// updated compared to the last existing object.
|
||||
if storedObj.ResourceVersion == out.ResourceVersion {
|
||||
t.Errorf("expecting resource version to be updated, but get: %s", out.ResourceVersion)
|
||||
}
|
||||
out.ResourceVersion = storedObj.ResourceVersion
|
||||
ExpectNoDiff(t, "incorrect pod:", tt.expectedObj, out)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -260,14 +266,20 @@ func RunTestConditionalDelete(ctx context.Context, t *testing.T, store storage.I
|
||||
err := store.Delete(ctx, key, out, tt.precondition, storage.ValidateAllObjectFunc, nil)
|
||||
if tt.expectInvalidObjErr {
|
||||
if err == nil || !storage.IsInvalidObj(err) {
|
||||
t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err)
|
||||
t.Errorf("expecting invalid UID error, but get: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), storedObj, out)
|
||||
// We expect the resource version of the returned object to be
|
||||
// updated compared to the last existing object.
|
||||
if storedObj.ResourceVersion == out.ResourceVersion {
|
||||
t.Errorf("expecting resource version to be updated, but get: %s", out.ResourceVersion)
|
||||
}
|
||||
out.ResourceVersion = storedObj.ResourceVersion
|
||||
ExpectNoDiff(t, "incorrect pod:", storedObj, out)
|
||||
key, storedObj = TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user