mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Fix watching from resourceVersion=0 in etcd3 watcher
This commit is contained in:
parent
dbb4def470
commit
7f61d37996
@ -30,14 +30,15 @@ type event struct {
|
|||||||
isCreated bool
|
isCreated bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseKV(kv *mvccpb.KeyValue, prevVal []byte) *event {
|
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
|
||||||
|
func parseKV(kv *mvccpb.KeyValue) *event {
|
||||||
return &event{
|
return &event{
|
||||||
key: string(kv.Key),
|
key: string(kv.Key),
|
||||||
value: kv.Value,
|
value: kv.Value,
|
||||||
prevValue: prevVal,
|
prevValue: nil,
|
||||||
rev: kv.ModRevision,
|
rev: kv.ModRevision,
|
||||||
isDeleted: false,
|
isDeleted: false,
|
||||||
isCreated: kv.ModRevision == kv.CreateRevision,
|
isCreated: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,6 +146,7 @@ func (wc *watchChan) ResultChan() <-chan watch.Event {
|
|||||||
|
|
||||||
// sync tries to retrieve existing data and send them to process.
|
// sync tries to retrieve existing data and send them to process.
|
||||||
// The revision to watch will be set to the revision in response.
|
// The revision to watch will be set to the revision in response.
|
||||||
|
// All events sent will have isCreated=true
|
||||||
func (wc *watchChan) sync() error {
|
func (wc *watchChan) sync() error {
|
||||||
opts := []clientv3.OpOption{}
|
opts := []clientv3.OpOption{}
|
||||||
if wc.recursive {
|
if wc.recursive {
|
||||||
@ -156,17 +157,8 @@ func (wc *watchChan) sync() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
wc.initialRev = getResp.Header.Revision
|
wc.initialRev = getResp.Header.Revision
|
||||||
|
|
||||||
for _, kv := range getResp.Kvs {
|
for _, kv := range getResp.Kvs {
|
||||||
prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1), clientv3.WithSerializable())
|
wc.sendEvent(parseKV(kv))
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
var prevVal []byte
|
|
||||||
if len(prevResp.Kvs) > 0 {
|
|
||||||
prevVal = prevResp.Kvs[0].Value
|
|
||||||
}
|
|
||||||
wc.sendEvent(parseKV(kv, prevVal))
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -141,17 +142,64 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
|||||||
|
|
||||||
// TestWatchFromZero tests that
|
// TestWatchFromZero tests that
|
||||||
// - watch from 0 should sync up and grab the object added before
|
// - watch from 0 should sync up and grab the object added before
|
||||||
|
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||||
// - watch from non-0 should just watch changes after given version
|
// - watch from non-0 should just watch changes after given version
|
||||||
func TestWatchFromZero(t *testing.T) {
|
func TestWatchFromZero(t *testing.T) {
|
||||||
ctx, store, cluster := testSetup(t)
|
ctx, store, cluster := testSetup(t)
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
|
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, "0", storage.Everything)
|
w, err := store.Watch(ctx, key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckResult(t, 0, watch.Added, w, storedObj)
|
testCheckResult(t, 0, watch.Added, w, storedObj)
|
||||||
|
w.Stop()
|
||||||
|
|
||||||
|
// Update
|
||||||
|
out := &api.Pod{}
|
||||||
|
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure when we watch from 0 we receive an ADDED event
|
||||||
|
w, err = store.Watch(ctx, key, "0", storage.Everything)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
testCheckResult(t, 1, watch.Added, w, out)
|
||||||
|
w.Stop()
|
||||||
|
|
||||||
|
// Update again
|
||||||
|
out = &api.Pod{}
|
||||||
|
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compact previous versions
|
||||||
|
revToCompact, err := strconv.Atoi(out.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
|
||||||
|
}
|
||||||
|
_, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error compacting: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure we can still watch from 0 and receive an ADDED event
|
||||||
|
w, err = store.Watch(ctx, key, "0", storage.Everything)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
testCheckResult(t, 2, watch.Added, w, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchFromNoneZero tests that
|
// TestWatchFromNoneZero tests that
|
||||||
|
Loading…
Reference in New Issue
Block a user