Send correct resource version for delete events from watch cache

This commit is contained in:
Jordan Liggitt 2018-01-19 17:44:25 -05:00
parent f9bb978ad6
commit 57998d247d
No known key found for this signature in database
GPG Key ID: 39928704103C7229
3 changed files with 96 additions and 60 deletions

View File

@ -21,6 +21,7 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",

View File

@ -338,7 +338,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget) watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++ c.watcherIdx++
@ -790,22 +790,24 @@ func (c *errWatcher) Stop() {
// cachWatcher implements watch.Interface // cachWatcher implements watch.Interface
type cacheWatcher struct { type cacheWatcher struct {
sync.Mutex sync.Mutex
input chan *watchCacheEvent input chan *watchCacheEvent
result chan watch.Event result chan watch.Event
done chan struct{} done chan struct{}
filter watchFilterFunc filter watchFilterFunc
stopped bool stopped bool
forget func(bool) forget func(bool)
versioner Versioner
} }
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher { func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher {
watcher := &cacheWatcher{ watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize), input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize), result: make(chan watch.Event, chanSize),
done: make(chan struct{}), done: make(chan struct{}),
filter: filter, filter: filter,
stopped: false, stopped: false,
forget: forget, forget: forget,
versioner: versioner,
} }
go watcher.process(initEvents, resourceVersion) go watcher.process(initEvents, resourceVersion)
return watcher return watcher
@ -894,7 +896,12 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
case curObjPasses && oldObjPasses: case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()} watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
case !curObjPasses && oldObjPasses: case !curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Deleted, Object: event.PrevObject.DeepCopyObject()} // return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
} }
// We need to ensure that if we put event X to the c.result, all // We need to ensure that if we put event X to the c.result, all

View File

@ -17,15 +17,19 @@ limitations under the License.
package storage package storage
import ( import (
"fmt"
"reflect" "reflect"
"strconv"
"sync" "sync"
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -48,7 +52,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
} }
// set the size of the buffer of w.result to 0, so that the writes to // set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked. // w.result is blocked.
w := newCacheWatcher(0, 0, initEvents, filter, forget) w := newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{})
w.Stop() w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock() lock.RLock()
@ -73,28 +77,31 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) {
{ {
events: []*watchCacheEvent{ events: []*watchCacheEvent{
{ {
Type: watch.Added, Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": "host"}, ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 1,
}, },
{ {
Type: watch.Modified, Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"}, PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""}, ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 2,
}, },
{ {
Type: watch.Modified, Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""}, PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"}, ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 3,
}, },
}, },
expected: []watch.Event{ expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
}, },
}, },
@ -102,50 +109,56 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) {
{ {
events: []*watchCacheEvent{ events: []*watchCacheEvent{
{ {
Type: watch.Added, Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": ""}, ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 1,
}, },
{ {
Type: watch.Modified, Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": ""}, PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""}, ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 2,
}, },
{ {
Type: watch.Modified, Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""}, PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"}, ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 3,
}, },
{ {
Type: watch.Modified, Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"}, PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
ObjFields: fields.Set{"spec.nodeName": "host"}, ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 4,
}, },
{ {
Type: watch.Modified, Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"}, PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
ObjFields: fields.Set{"spec.nodeName": ""}, ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 5,
}, },
{ {
Type: watch.Modified, Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
PrevObjFields: fields.Set{"spec.nodeName": ""}, PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
ObjFields: fields.Set{"spec.nodeName": ""}, ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 6,
}, },
}, },
expected: []watch.Event{ expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, {Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}},
}, },
}, },
} }
@ -157,7 +170,7 @@ TestCase:
for j := range testCase.events { for j := range testCase.events {
testCase.events[j].ResourceVersion = uint64(j) + 1 testCase.events[j].ResourceVersion = uint64(j) + 1
} }
w := newCacheWatcher(0, 0, testCase.events, filter, forget) w := newCacheWatcher(0, 0, testCase.events, filter, forget, testVersioner{})
ch := w.ResultChan() ch := w.ResultChan()
for j, event := range testCase.expected { for j, event := range testCase.expected {
e := <-ch e := <-ch
@ -175,3 +188,18 @@ TestCase:
w.Stop() w.Stop()
} }
} }
type testVersioner struct{}
func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
}
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return 0, fmt.Errorf("unimplemented")
}