Cache watch returns incorrect object on DELETED events

The underlying storage has always returned the old object on watch
delete events when filtering. The cache watcher does not, which means a
downsteam caller gets different behavior.

This fixes the cache watcher to be consistent with our long term
behavior for watch. It may result in a behavior change (the filter
becomes more precise) but this was a regression in behavior.
This commit is contained in:
Clayton Coleman 2017-05-22 11:53:44 -04:00
parent 8cd95c78c4
commit e9e69356e4
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
3 changed files with 137 additions and 5 deletions

View File

@ -28,6 +28,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",

View File

@ -850,18 +850,28 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
return
}
object, err := c.copier.Copy(event.Object)
if err != nil {
glog.Errorf("unexpected copy error: %v", err)
return
}
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
object, err := c.copier.Copy(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
return
}
watchEvent = watch.Event{Type: watch.Added, Object: object}
case curObjPasses && oldObjPasses:
object, err := c.copier.Copy(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
return
}
watchEvent = watch.Event{Type: watch.Modified, Object: object}
case !curObjPasses && oldObjPasses:
object, err := c.copier.Copy(event.PrevObject)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
return
}
watchEvent = watch.Event{Type: watch.Deleted, Object: object}
}

View File

@ -17,13 +17,17 @@ limitations under the License.
package storage
import (
"reflect"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/pkg/api/v1"
)
@ -55,3 +59,120 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err)
}
}
func TestCacheWatcherHandlesFiltering(t *testing.T) {
filter := func(_ string, _ labels.Set, field fields.Set) bool {
return field["spec.nodeName"] == "host"
}
forget := func(bool) {}
testCases := []struct {
events []*watchCacheEvent
expected []watch.Event
}{
// properly handle starting with the filter, then being deleted, then re-added
{
events: []*watchCacheEvent{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
},
},
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
},
},
// properly handle ignoring changes prior to the filter, then getting added, then deleted
{
events: []*watchCacheEvent{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
},
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
},
},
}
TestCase:
for i, testCase := range testCases {
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
for j := range testCase.events {
testCase.events[j].ResourceVersion = uint64(j) + 1
}
w := newCacheWatcher(scheme.Scheme, 0, 0, testCase.events, filter, forget)
ch := w.ResultChan()
for j, event := range testCase.expected {
e := <-ch
if !reflect.DeepEqual(event, e) {
t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e))
break TestCase
}
}
select {
case obj, ok := <-ch:
t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok)
break TestCase
default:
}
w.Stop()
}
}