mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-21 04:45:09 +00:00
handle watch for unsafe delete
Kubernetes-commit: 25efc8f2d136a9574166be02789ac727c5b4a3fd
This commit is contained in:
parent
95fc3d7e88
commit
e132ac21c9
149
tools/cache/reflector_test.go
vendored
149
tools/cache/reflector_test.go
vendored
@ -21,9 +21,12 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -1753,6 +1756,152 @@ func TestReflectorListExtract(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
|
||||||
|
mkPod := func(id string, rv string) *v1.Pod {
|
||||||
|
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}}
|
||||||
|
}
|
||||||
|
mkList := func(rv string, pods ...*v1.Pod) *v1.PodList {
|
||||||
|
list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}}
|
||||||
|
for _, pod := range pods {
|
||||||
|
list.Items = append(list.Items, *pod)
|
||||||
|
}
|
||||||
|
return list
|
||||||
|
}
|
||||||
|
makeStatus := func() *metav1.Status {
|
||||||
|
return &metav1.Status{
|
||||||
|
Status: metav1.StatusFailure,
|
||||||
|
Code: http.StatusInternalServerError,
|
||||||
|
Reason: metav1.StatusReasonStoreReadError,
|
||||||
|
Message: "failed to prepare current and previous objects: corrupt object has been deleted",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// these pods preexist and never get updated/deleted
|
||||||
|
preExisting := mkPod("foo-1", "1")
|
||||||
|
pods := []*v1.Pod{preExisting, mkPod("foo-2", "2"), mkPod("foo-3", "3")}
|
||||||
|
lastExpectedRV := "5"
|
||||||
|
lists := []*v1.PodList{
|
||||||
|
mkList("3", pods...), // initial list
|
||||||
|
mkList(lastExpectedRV, pods...), // re-list due to watch error
|
||||||
|
}
|
||||||
|
corruptObj := mkPod("foo", "4")
|
||||||
|
events := []watch.Event{
|
||||||
|
{Type: watch.Added, Object: corruptObj},
|
||||||
|
// the object becomes corrupt, and it gets unsafe-deleted, and
|
||||||
|
// watch sends the following Error event, note the RV has
|
||||||
|
// advanced to "5" in the storage due to the delete operation
|
||||||
|
{Type: watch.Error, Object: makeStatus()},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||||
|
var replaceInvoked atomic.Int32
|
||||||
|
store := &fakeStore{
|
||||||
|
Store: s,
|
||||||
|
beforeReplace: func(list []interface{}, rv string) {
|
||||||
|
// interested in the Replace call that happens after the Error event
|
||||||
|
if rv == lastExpectedRV {
|
||||||
|
replaceInvoked.Add(1)
|
||||||
|
_, exists, err := s.Get(corruptObj)
|
||||||
|
if err != nil || !exists {
|
||||||
|
t.Errorf("expected the object to exist in the store, exists: %t, err: %v", exists, err)
|
||||||
|
}
|
||||||
|
_, exists, err = s.Get(preExisting)
|
||||||
|
if err != nil || !exists {
|
||||||
|
t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
afterReplace: func(rv string, err error) {
|
||||||
|
if rv == lastExpectedRV {
|
||||||
|
replaceInvoked.Add(1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expected Replace to have succeeded, but got error: %v", err)
|
||||||
|
}
|
||||||
|
_, exists, err := s.Get(corruptObj)
|
||||||
|
if err != nil || exists {
|
||||||
|
t.Errorf("expected the object to have been removed from the store, exists: %t, err: %v", exists, err)
|
||||||
|
}
|
||||||
|
// show that a pre-existing pod is still in the cache
|
||||||
|
_, exists, err = s.Get(preExisting)
|
||||||
|
if err != nil || !exists {
|
||||||
|
t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var once sync.Once
|
||||||
|
lw := &testLW{
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
fw := watch.NewFake()
|
||||||
|
go func() {
|
||||||
|
once.Do(func() {
|
||||||
|
for _, e := range events {
|
||||||
|
fw.Action(e.Type, e.Object)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
return fw, nil
|
||||||
|
},
|
||||||
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
var list runtime.Object
|
||||||
|
if len(lists) > 0 {
|
||||||
|
list = lists[0]
|
||||||
|
lists = lists[1:]
|
||||||
|
}
|
||||||
|
return list, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
r := NewReflector(lw, &v1.Pod{}, store, 0)
|
||||||
|
doneCh, stopCh := make(chan struct{}), make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(doneCh)
|
||||||
|
r.Run(stopCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait for the RV to sync to the version returned by the final list
|
||||||
|
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||||
|
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
|
||||||
|
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
|
||||||
|
}
|
||||||
|
if want, got := 2, int(replaceInvoked.Load()); want != got {
|
||||||
|
t.Errorf("expected store Delete hooks to be invoked %d times, but got: %d", want, got)
|
||||||
|
}
|
||||||
|
if want, got := len(pods), len(s.List()); want != got {
|
||||||
|
t.Errorf("expected the store to have %d objects, but got: %d", want, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(stopCh)
|
||||||
|
select {
|
||||||
|
case <-doneCh:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("timed out waiting for Run to return")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeStore struct {
|
||||||
|
Store
|
||||||
|
beforeReplace func(list []interface{}, s string)
|
||||||
|
afterReplace func(rv string, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeStore) Replace(list []interface{}, rv string) error {
|
||||||
|
f.beforeReplace(list, rv)
|
||||||
|
err := f.Store.Replace(list, rv)
|
||||||
|
f.afterReplace(rv, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkExtractList(b *testing.B) {
|
func BenchmarkExtractList(b *testing.B) {
|
||||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||||
|
Loading…
Reference in New Issue
Block a user