mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Change 'sendCreate' to 'sendAdd'; integration passes.
This commit is contained in:
parent
db1c0db5c7
commit
6b69ed402b
@ -126,7 +126,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
resourceVersion = latest
|
||||
resourceVersion = latest + 1
|
||||
}
|
||||
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
|
||||
if err != etcd.ErrWatchStoppedByUser {
|
||||
@ -146,11 +146,11 @@ func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, inc
|
||||
return
|
||||
}
|
||||
if index, ok := etcdErrorIndex(err); ok {
|
||||
resourceVersion = index + 1
|
||||
resourceVersion = index
|
||||
}
|
||||
return
|
||||
}
|
||||
resourceVersion = resp.EtcdIndex + 1
|
||||
resourceVersion = resp.EtcdIndex
|
||||
convertRecursiveResponse(resp.Node, resp, incoming)
|
||||
return
|
||||
}
|
||||
@ -217,7 +217,7 @@ func (w *etcdWatcher) decodeObject(data []byte, index uint64) (interface{}, erro
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (w *etcdWatcher) sendCreate(res *etcd.Response) {
|
||||
func (w *etcdWatcher) sendAdd(res *etcd.Response) {
|
||||
if res.Node == nil {
|
||||
glog.Errorf("unexpected nil node: %#v", res)
|
||||
return
|
||||
@ -296,7 +296,14 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
||||
return
|
||||
}
|
||||
data := []byte(res.PrevNode.Value)
|
||||
obj, err := w.decodeObject(data, res.PrevNode.ModifiedIndex)
|
||||
index := res.PrevNode.ModifiedIndex
|
||||
if res.Node != nil {
|
||||
// Note that this sends the *old* object with the etcd index for the time at
|
||||
// which it gets deleted. This will allow users to restart the watch at the right
|
||||
// index.
|
||||
index = res.Node.ModifiedIndex
|
||||
}
|
||||
obj, err := w.decodeObject(data, index)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode)
|
||||
// TODO: expose an error through watch.Interface?
|
||||
@ -316,7 +323,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
||||
func (w *etcdWatcher) sendResult(res *etcd.Response) {
|
||||
switch res.Action {
|
||||
case "create", "get":
|
||||
w.sendCreate(res)
|
||||
w.sendAdd(res)
|
||||
case "set", "compareAndSwap":
|
||||
w.sendModify(res)
|
||||
case "delete":
|
||||
|
@ -125,7 +125,7 @@ func TestWatch(t *testing.T) {
|
||||
expectedVersion = resp.Node.ModifiedIndex
|
||||
event = <-w.ResultChan()
|
||||
if event.Type != watch.Deleted {
|
||||
t.Fatalf("expected deleted event", event)
|
||||
t.Errorf("expected deleted event %#v", event)
|
||||
}
|
||||
pod = event.Object.(*api.Pod)
|
||||
if pod.ResourceVersion != expectedVersion {
|
||||
|
Loading…
Reference in New Issue
Block a user