diff --git a/pkg/storage/etcd3/event.go b/pkg/storage/etcd3/event.go index 4746dafb421..7dc9175bcf8 100644 --- a/pkg/storage/etcd3/event.go +++ b/pkg/storage/etcd3/event.go @@ -30,14 +30,15 @@ type event struct { isCreated bool } -func parseKV(kv *mvccpb.KeyValue, prevVal []byte) *event { +// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event. +func parseKV(kv *mvccpb.KeyValue) *event { return &event{ key: string(kv.Key), value: kv.Value, - prevValue: prevVal, + prevValue: nil, rev: kv.ModRevision, isDeleted: false, - isCreated: kv.ModRevision == kv.CreateRevision, + isCreated: true, } } diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 63ede7ee2e9..1326ec7bff9 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -146,6 +146,7 @@ func (wc *watchChan) ResultChan() <-chan watch.Event { // sync tries to retrieve existing data and send them to process. // The revision to watch will be set to the revision in response. +// All events sent will have isCreated=true func (wc *watchChan) sync() error { opts := []clientv3.OpOption{} if wc.recursive { @@ -156,17 +157,8 @@ func (wc *watchChan) sync() error { return err } wc.initialRev = getResp.Header.Revision - for _, kv := range getResp.Kvs { - prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1), clientv3.WithSerializable()) - if err != nil { - return err - } - var prevVal []byte - if len(prevResp.Kvs) > 0 { - prevVal = prevResp.Kvs[0].Value - } - wc.sendEvent(parseKV(kv, prevVal)) + wc.sendEvent(parseKV(kv)) } return nil } diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 43281b1d799..b7e9f785dc2 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "reflect" + "strconv" "sync" "testing" "time" @@ -141,17 +142,64 @@ func TestDeleteTriggerWatch(t *testing.T) { // TestWatchFromZero tests that // - watch from 0 should sync up and grab the object added before +// - watch from 0 is able to return events for objects whose previous version has been compacted // - watch from non-0 should just watch changes after given version func TestWatchFromZero(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) - key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns"}}) w, err := store.Watch(ctx, key, "0", storage.Everything) if err != nil { t.Fatalf("Watch failed: %v", err) } testCheckResult(t, 0, watch.Added, w, storedObj) + w.Stop() + + // Update + out := &api.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil + })) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Make sure when we watch from 0 we receive an ADDED event + w, err = store.Watch(ctx, key, "0", storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckResult(t, 1, watch.Added, w, out) + w.Stop() + + // Update again + out = &api.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil + })) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Compact previous versions + revToCompact, err := strconv.Atoi(out.ResourceVersion) + if err != nil { + t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) + } + _, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical()) + if err != nil { + t.Fatalf("Error compacting: %v", err) + } + + // Make sure we can still watch from 0 and receive an ADDED event + w, err = store.Watch(ctx, key, "0", storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckResult(t, 2, watch.Added, w, out) } // TestWatchFromNoneZero tests that