mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 02:34:03 +00:00
Merge pull request #25415 from hongchaodeng/e2
Automatic merge from submit-queue etcd_watcher: make Deleted Event.Object's version consistent ### What's the problem? In [sendDelete()](995f022808/pkg/storage/etcd/etcd_watcher.go (L437-L442)
), Deleted Event.Object's resource version will be set to the latest resource version. This is actually an assumption made by cacher that all later events should have objects with larger verions; See [here](995f022808/pkg/storage/cacher.go (L579-L581)
). In sendModify(), it could also return Deleted event. However, the resource version is still the old version, which is inconsistent behavior with sendDelete(). ### What's this PR? This PR sets the version of oldObj in sendModify() to be the latest version. Provided unit test for it.
This commit is contained in:
commit
be104a8271
@ -399,6 +399,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
|||||||
if res.PrevNode != nil && res.PrevNode.Value != "" {
|
if res.PrevNode != nil && res.PrevNode.Value != "" {
|
||||||
// Ignore problems reading the old object.
|
// Ignore problems reading the old object.
|
||||||
if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
|
if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
|
||||||
|
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
|
||||||
|
}
|
||||||
oldObjPasses = w.filter(oldObj)
|
oldObjPasses = w.filter(oldObj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -220,6 +220,74 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
|
||||||
|
codec := testapi.Default.Codec()
|
||||||
|
filter := func(obj runtime.Object) bool {
|
||||||
|
return obj.(*api.Pod).Name != "bar"
|
||||||
|
}
|
||||||
|
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{})
|
||||||
|
|
||||||
|
eventChan := make(chan watch.Event, 1)
|
||||||
|
w.emit = func(e watch.Event) {
|
||||||
|
eventChan <- e
|
||||||
|
}
|
||||||
|
|
||||||
|
fooPod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||||
|
barPod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
|
||||||
|
fooBytes, err := runtime.Encode(codec, fooPod)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Encode failed: %v", err)
|
||||||
|
}
|
||||||
|
barBytes, err := runtime.Encode(codec, barPod)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Encode failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
response *etcd.Response
|
||||||
|
expRV string
|
||||||
|
}{{ // Delete event
|
||||||
|
response: &etcd.Response{
|
||||||
|
Action: EtcdDelete,
|
||||||
|
Node: &etcd.Node{
|
||||||
|
ModifiedIndex: 2,
|
||||||
|
},
|
||||||
|
PrevNode: &etcd.Node{
|
||||||
|
Value: string(fooBytes),
|
||||||
|
ModifiedIndex: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expRV: "2",
|
||||||
|
}, { // Modify event with uninterested data
|
||||||
|
response: &etcd.Response{
|
||||||
|
Action: EtcdSet,
|
||||||
|
Node: &etcd.Node{
|
||||||
|
Value: string(barBytes),
|
||||||
|
ModifiedIndex: 2,
|
||||||
|
},
|
||||||
|
PrevNode: &etcd.Node{
|
||||||
|
Value: string(fooBytes),
|
||||||
|
ModifiedIndex: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expRV: "2",
|
||||||
|
}}
|
||||||
|
|
||||||
|
for i, tt := range tests {
|
||||||
|
w.sendResult(tt.response)
|
||||||
|
ev := <-eventChan
|
||||||
|
if ev.Type != watch.Deleted {
|
||||||
|
t.Errorf("#%d: event type want=Deleted, get=%s", i, ev.Type)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rv := ev.Object.(*api.Pod).ResourceVersion
|
||||||
|
if rv != tt.expRV {
|
||||||
|
t.Errorf("#%d: resource version want=%s, get=%s", i, tt.expRV, rv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
codec := testapi.Default.Codec()
|
codec := testapi.Default.Codec()
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
|
Loading…
Reference in New Issue
Block a user