mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Fix setting resource version on deletion
This commit is contained in:
parent
3628532311
commit
bbcf5e3877
@ -330,7 +330,15 @@ func (s *store) conditionalDelete(
|
|||||||
origStateIsCurrent = true
|
origStateIsCurrent = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
|
||||||
|
if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil {
|
||||||
|
return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses))
|
||||||
|
}
|
||||||
|
deleteResp := txnResp.Responses[0].GetResponseDeleteRange()
|
||||||
|
if deleteResp.Header == nil {
|
||||||
|
return errors.New("invalid DeleteRange response - nil header")
|
||||||
|
}
|
||||||
|
return decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,8 +23,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
|
||||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -72,7 +70,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
|||||||
origCtx, store, _ := testSetup(t)
|
origCtx, store, _ := testSetup(t)
|
||||||
ctx, cancel := context.WithCancel(origCtx)
|
ctx, cancel := context.WithCancel(origCtx)
|
||||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything)
|
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything)
|
||||||
// make resutlChan and errChan blocking to ensure ordering.
|
// make resultChan and errChan blocking to ensure ordering.
|
||||||
w.resultChan = make(chan watch.Event)
|
w.resultChan = make(chan watch.Event)
|
||||||
w.errChan = make(chan error)
|
w.errChan = make(chan error)
|
||||||
// The event flow goes like:
|
// The event flow goes like:
|
||||||
@ -91,57 +89,31 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
|
||||||
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout)
|
||||||
|
w, err := store.Watch(watchCtx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
rv, err := storage.APIObjectVersioner{}.ObjectResourceVersion(storedObj)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to parse resourceVersion on stored object: %v", err)
|
|
||||||
}
|
|
||||||
etcdW := client.Watch(ctx, key, clientv3.WithRev(int64(rv)))
|
|
||||||
|
|
||||||
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
|
deletedObj := &example.Pod{}
|
||||||
|
if err := store.Delete(ctx, key, deletedObj, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
|
||||||
t.Fatalf("Delete failed: %v", err)
|
t.Fatalf("Delete failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var e watch.Event
|
// Verify that ResourceVersion has changed on deletion.
|
||||||
watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout)
|
if storedObj.ResourceVersion == deletedObj.ResourceVersion {
|
||||||
|
t.Fatalf("ResourceVersion didn't changed on deletion: %s", deletedObj.ResourceVersion)
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case e = <-w.ResultChan():
|
case event := <-w.ResultChan():
|
||||||
case <-watchCtx.Done():
|
watchedDeleteObj := event.Object.(*example.Pod)
|
||||||
t.Fatalf("timed out waiting for watch event")
|
if e, a := deletedObj.ResourceVersion, watchedDeleteObj.ResourceVersion; e != a {
|
||||||
}
|
t.Errorf("Unexpected resource version: %v, expected %v", a, e)
|
||||||
deletedRV, err := deletedRevision(watchCtx, etcdW)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("did not see delete event in raw watch: %v", err)
|
|
||||||
}
|
|
||||||
watchedDeleteObj := e.Object.(*example.Pod)
|
|
||||||
|
|
||||||
watchedDeleteRev, err := store.versioner.ParseResourceVersion(watchedDeleteObj.ResourceVersion)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
|
||||||
}
|
|
||||||
if int64(watchedDeleteRev) != deletedRV {
|
|
||||||
t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d",
|
|
||||||
watchedDeleteRev, deletedRV)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func deletedRevision(ctx context.Context, watch <-chan clientv3.WatchResponse) (int64, error) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return 0, ctx.Err()
|
|
||||||
case wres := <-watch:
|
|
||||||
for _, evt := range wres.Events {
|
|
||||||
if evt.Type == mvccpb.DELETE && evt.Kv != nil {
|
|
||||||
return evt.Kv.ModRevision, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -225,14 +225,20 @@ func RunTestUnconditionalDelete(ctx context.Context, t *testing.T, store storage
|
|||||||
err := store.Delete(ctx, tt.key, out, nil, storage.ValidateAllObjectFunc, nil)
|
err := store.Delete(ctx, tt.key, out, nil, storage.ValidateAllObjectFunc, nil)
|
||||||
if tt.expectNotFoundErr {
|
if tt.expectNotFoundErr {
|
||||||
if err == nil || !storage.IsNotFound(err) {
|
if err == nil || !storage.IsNotFound(err) {
|
||||||
t.Errorf("%s: expecting not found error, but get: %s", tt.name, err)
|
t.Errorf("expecting not found error, but get: %s", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
t.Fatalf("Delete failed: %v", err)
|
||||||
}
|
}
|
||||||
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod:", tt.name), tt.expectedObj, out)
|
// We expect the resource version of the returned object to be
|
||||||
|
// updated compared to the last existing object.
|
||||||
|
if storedObj.ResourceVersion == out.ResourceVersion {
|
||||||
|
t.Errorf("expecting resource version to be updated, but get: %s", out.ResourceVersion)
|
||||||
|
}
|
||||||
|
out.ResourceVersion = storedObj.ResourceVersion
|
||||||
|
ExpectNoDiff(t, "incorrect pod:", tt.expectedObj, out)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -260,14 +266,20 @@ func RunTestConditionalDelete(ctx context.Context, t *testing.T, store storage.I
|
|||||||
err := store.Delete(ctx, key, out, tt.precondition, storage.ValidateAllObjectFunc, nil)
|
err := store.Delete(ctx, key, out, tt.precondition, storage.ValidateAllObjectFunc, nil)
|
||||||
if tt.expectInvalidObjErr {
|
if tt.expectInvalidObjErr {
|
||||||
if err == nil || !storage.IsInvalidObj(err) {
|
if err == nil || !storage.IsInvalidObj(err) {
|
||||||
t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err)
|
t.Errorf("expecting invalid UID error, but get: %s", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
t.Fatalf("Delete failed: %v", err)
|
||||||
}
|
}
|
||||||
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), storedObj, out)
|
// We expect the resource version of the returned object to be
|
||||||
|
// updated compared to the last existing object.
|
||||||
|
if storedObj.ResourceVersion == out.ResourceVersion {
|
||||||
|
t.Errorf("expecting resource version to be updated, but get: %s", out.ResourceVersion)
|
||||||
|
}
|
||||||
|
out.ResourceVersion = storedObj.ResourceVersion
|
||||||
|
ExpectNoDiff(t, "incorrect pod:", storedObj, out)
|
||||||
key, storedObj = TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
key, storedObj = TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user