mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 11:38:15 +00:00
Test that separation of streams work by using progress notifies
This commit is contained in:
parent
31d404b182
commit
1cf4cec449
@ -29,6 +29,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -2345,3 +2346,128 @@ func TestGetBookmarkAfterResourceVersionLockedFunc(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchStreamSeparation(t *testing.T) {
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
separateCacheWatchRPC bool
|
||||||
|
useWatchCacheContextMetadata bool
|
||||||
|
expectBookmarkOnWatchCache bool
|
||||||
|
expectBookmarkOnEtcd bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "common RPC > both get bookmarks",
|
||||||
|
separateCacheWatchRPC: false,
|
||||||
|
expectBookmarkOnEtcd: true,
|
||||||
|
expectBookmarkOnWatchCache: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "common RPC & watch cache context > both get bookmarks",
|
||||||
|
separateCacheWatchRPC: false,
|
||||||
|
useWatchCacheContextMetadata: true,
|
||||||
|
expectBookmarkOnEtcd: true,
|
||||||
|
expectBookmarkOnWatchCache: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "separate RPC > only etcd gets bookmarks",
|
||||||
|
separateCacheWatchRPC: true,
|
||||||
|
expectBookmarkOnEtcd: true,
|
||||||
|
expectBookmarkOnWatchCache: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "separate RPC & watch cache context > only watch cache gets bookmarks",
|
||||||
|
separateCacheWatchRPC: true,
|
||||||
|
useWatchCacheContextMetadata: true,
|
||||||
|
expectBookmarkOnEtcd: false,
|
||||||
|
expectBookmarkOnWatchCache: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)()
|
||||||
|
_, cacher, _, terminate := testSetupWithEtcdServer(t)
|
||||||
|
t.Cleanup(terminate)
|
||||||
|
if err := cacher.ready.wait(context.TODO()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
getCacherRV := func() uint64 {
|
||||||
|
cacher.watchCache.RLock()
|
||||||
|
defer cacher.watchCache.RUnlock()
|
||||||
|
return cacher.watchCache.resourceVersion
|
||||||
|
}
|
||||||
|
waitContext, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
waitForEtcdBookmark := watchAndWaitForBookmark(t, waitContext, cacher.storage)
|
||||||
|
|
||||||
|
var out example.Pod
|
||||||
|
err := cacher.Create(context.Background(), "foo", &example.Pod{}, &out, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
var lastResourceVersion uint64
|
||||||
|
lastResourceVersion, err = versioner.ObjectResourceVersion(&out)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var contextMetadata metadata.MD
|
||||||
|
if tc.useWatchCacheContextMetadata {
|
||||||
|
contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata
|
||||||
|
}
|
||||||
|
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
|
||||||
|
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix.
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
err = cacher.storage.RequestWatchProgress(metadata.NewOutgoingContext(context.Background(), contextMetadata))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// Give time for bookmark to arrive
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
etcdWatchResourceVersion := waitForEtcdBookmark()
|
||||||
|
gotEtcdWatchBookmark := etcdWatchResourceVersion == lastResourceVersion
|
||||||
|
if gotEtcdWatchBookmark != tc.expectBookmarkOnEtcd {
|
||||||
|
t.Errorf("Unexpected etcd bookmark check result, rv: %d, got: %v, want: %v", etcdWatchResourceVersion, etcdWatchResourceVersion, tc.expectBookmarkOnEtcd)
|
||||||
|
}
|
||||||
|
|
||||||
|
watchCacheResourceVersion := getCacherRV()
|
||||||
|
cacherGotBookmark := watchCacheResourceVersion == lastResourceVersion
|
||||||
|
if cacherGotBookmark != tc.expectBookmarkOnWatchCache {
|
||||||
|
t.Errorf("Unexpected watch cache bookmark check result, rv: %d, got: %v, want: %v", watchCacheResourceVersion, cacherGotBookmark, tc.expectBookmarkOnWatchCache)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
|
||||||
|
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
|
||||||
|
opts.Predicate.AllowWatchBookmarks = true
|
||||||
|
w, err := etcdStorage.Watch(ctx, "/pods/", opts)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
var rv uint64
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
if event.Type == watch.Bookmark {
|
||||||
|
rv, err = versioner.ObjectResourceVersion(event.Object)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return func() (resourceVersion uint64) {
|
||||||
|
defer w.Stop()
|
||||||
|
wg.Wait()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return rv
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user