mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #96662 from wojtek-t/fix_starting_rv_test
Fix TestStartingResourceVersion flakiness
This commit is contained in:
commit
06b0179895
@ -1006,6 +1006,87 @@ func (f *fakeTimeBudget) takeAvailable() time.Duration {
|
|||||||
|
|
||||||
func (f *fakeTimeBudget) returnUnused(_ time.Duration) {}
|
func (f *fakeTimeBudget) returnUnused(_ time.Duration) {}
|
||||||
|
|
||||||
|
func TestStartingResourceVersion(t *testing.T) {
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
// Wait until cacher is initialized.
|
||||||
|
cacher.ready.wait()
|
||||||
|
|
||||||
|
// Ensure there is some budget for slowing down processing.
|
||||||
|
// We use the fakeTimeBudget to prevent this test from flaking under
|
||||||
|
// the following conditions:
|
||||||
|
// 1) in total we create 11 events that has to be processed by the watcher
|
||||||
|
// 2) the size of the channels are set to 10 for the watcher
|
||||||
|
// 3) if the test is cpu-starved and the internal goroutine is not picking
|
||||||
|
// up these events from the channel, after consuming the whole time
|
||||||
|
// budget (defaulted to 100ms) on waiting, we will simply close the watch,
|
||||||
|
// which will cause the test failure
|
||||||
|
// Using fakeTimeBudget gives us always a budget to wait and have a test
|
||||||
|
// pick up something from ResultCh in the meantime.
|
||||||
|
//
|
||||||
|
// The same can potentially happen in production, but in that case a watch
|
||||||
|
// can be resumed by the client. This doesn't work in the case of this test,
|
||||||
|
// because we explicitly want to test the behavior that object changes are
|
||||||
|
// happening after the watch was initiated.
|
||||||
|
cacher.dispatchTimeoutBudget = &fakeTimeBudget{}
|
||||||
|
|
||||||
|
makePod := func(i int) *examplev1.Pod {
|
||||||
|
return &examplev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "ns",
|
||||||
|
Labels: map[string]string{"foo": strconv.Itoa(i)},
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", i),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cacher.watchCache.Add(makePod(1000)); err != nil {
|
||||||
|
t.Errorf("error: %v", err)
|
||||||
|
}
|
||||||
|
// Advance RV by 10.
|
||||||
|
startVersion := uint64(1010)
|
||||||
|
|
||||||
|
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: strconv.FormatUint(startVersion, 10), Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
defer watcher.Stop()
|
||||||
|
|
||||||
|
for i := 1; i <= 11; i++ {
|
||||||
|
if err := cacher.watchCache.Update(makePod(1000 + i)); err != nil {
|
||||||
|
t.Errorf("error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case e, ok := <-watcher.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("unexpectedly closed watch")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
object := e.Object
|
||||||
|
if co, ok := object.(runtime.CacheableObject); ok {
|
||||||
|
object = co.GetObject()
|
||||||
|
}
|
||||||
|
pod := object.(*examplev1.Pod)
|
||||||
|
podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// event should have at least rv + 1, since we're starting the watch at rv
|
||||||
|
if podRV <= startVersion {
|
||||||
|
t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
@ -1018,8 +1099,8 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
|||||||
cacher.ready.wait()
|
cacher.ready.wait()
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
// When using the official `timeBudgetImpl` we were observing flakiness
|
// We use the fakeTimeBudget to prevent this test from flaking under
|
||||||
// due under the following conditions:
|
// the following conditions:
|
||||||
// 1) the watch w1 is blocked, so we were consuming the whole budget once
|
// 1) the watch w1 is blocked, so we were consuming the whole budget once
|
||||||
// its buffer was filled in (10 items)
|
// its buffer was filled in (10 items)
|
||||||
// 2) the budget is refreshed once per second, so it basically wasn't
|
// 2) the budget is refreshed once per second, so it basically wasn't
|
||||||
|
@ -603,61 +603,6 @@ func TestFiltering(t *testing.T) {
|
|||||||
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
|
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStartingResourceVersion(t *testing.T) {
|
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
|
||||||
defer server.Terminate(t)
|
|
||||||
cacher, v, err := newTestCacher(etcdStorage)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Couldn't create cacher: %v", err)
|
|
||||||
}
|
|
||||||
defer cacher.Stop()
|
|
||||||
|
|
||||||
// add 1 object
|
|
||||||
podFoo := makeTestPod("foo")
|
|
||||||
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
|
||||||
|
|
||||||
// Set up Watch starting at fooCreated.ResourceVersion + 10
|
|
||||||
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
rv += 10
|
|
||||||
startVersion := strconv.Itoa(int(rv))
|
|
||||||
|
|
||||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
defer watcher.Stop()
|
|
||||||
|
|
||||||
lastFoo := fooCreated
|
|
||||||
for i := 0; i < 11; i++ {
|
|
||||||
podFooForUpdate := makeTestPod("foo")
|
|
||||||
podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
|
|
||||||
lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case e := <-watcher.ResultChan():
|
|
||||||
object := e.Object
|
|
||||||
if co, ok := object.(runtime.CacheableObject); ok {
|
|
||||||
object = co.GetObject()
|
|
||||||
}
|
|
||||||
pod := object.(*example.Pod)
|
|
||||||
podRV, err := v.ParseResourceVersion(pod.ResourceVersion)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// event should have at least rv + 1, since we're starting the watch at rv
|
|
||||||
if podRV <= rv {
|
|
||||||
t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
|
|
||||||
}
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
|
||||||
t.Errorf("timed out waiting for event")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEmptyWatchEventCache(t *testing.T) {
|
func TestEmptyWatchEventCache(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
Loading…
Reference in New Issue
Block a user