Merge pull request #58547 from liggitt/watch-cache-delete-resourceversion

Automatic merge from submit-queue (batch tested with PRs 58547, 57228, 58528, 58499, 58618). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Send correct resource version for delete events from watch cache

Fixes #58545 

the watch cache filtering is returning the previous object content intact, including resource version. this is the logic the watch cache uses:
```go
switch {
case curObjPasses && !oldObjPasses:
	watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
case curObjPasses && oldObjPasses:
	watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
case !curObjPasses && oldObjPasses:
	watchEvent = watch.Event{Type: watch.Deleted, Object: event.PrevObject.DeepCopyObject()}
}
```

when processing a delete event, we should be sending the old object's content *but* with the event's resource version set in it. corresponding logic exists in the uncached stores:

77ac663df4/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go (L401-L403)

77ac663df4/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go (L373-L378)


```release-note
Fixes an issue where the resourceVersion of an object in a DELETE watch event was not the resourceVersion of the delete itself, but of the last update to the object. This could cause clients receiving the watch event to revert to an old "last observed" resource version, and disrupt their ability to re-establish watches properly.
```
This commit is contained in:
Kubernetes Submit Queue 2018-01-22 20:49:30 -08:00 committed by GitHub
commit 619305f101
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 60 deletions

View File

@ -21,6 +21,7 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",

View File

@ -338,7 +338,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
@ -790,22 +790,24 @@ func (c *errWatcher) Stop() {
// cachWatcher implements watch.Interface
type cacheWatcher struct {
sync.Mutex
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter watchFilterFunc
stopped bool
forget func(bool)
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter watchFilterFunc
stopped bool
forget func(bool)
versioner Versioner
}
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
}
go watcher.process(initEvents, resourceVersion)
return watcher
@ -894,7 +896,12 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
case !curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Deleted, Object: event.PrevObject.DeepCopyObject()}
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
}
// We need to ensure that if we put event X to the c.result, all

View File

@ -17,15 +17,19 @@ limitations under the License.
package storage
import (
"fmt"
"reflect"
"strconv"
"sync"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -48,7 +52,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
}
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w := newCacheWatcher(0, 0, initEvents, filter, forget)
w := newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{})
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
@ -73,28 +77,31 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) {
{
events: []*watchCacheEvent{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 1,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 2,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 3,
},
},
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
},
},
@ -102,50 +109,56 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) {
{
events: []*watchCacheEvent{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 1,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 2,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 3,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 4,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 5,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 6,
},
},
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}},
},
},
}
@ -157,7 +170,7 @@ TestCase:
for j := range testCase.events {
testCase.events[j].ResourceVersion = uint64(j) + 1
}
w := newCacheWatcher(0, 0, testCase.events, filter, forget)
w := newCacheWatcher(0, 0, testCase.events, filter, forget, testVersioner{})
ch := w.ResultChan()
for j, event := range testCase.expected {
e := <-ch
@ -175,3 +188,18 @@ TestCase:
w.Stop()
}
}
type testVersioner struct{}
func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
}
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return 0, fmt.Errorf("unimplemented")
}