From e24e3de379726c6f9aca999d32f0ea14757e5fda Mon Sep 17 00:00:00 2001 From: Gunju Kim Date: Sat, 18 Feb 2023 03:04:59 +0900 Subject: [PATCH] Deflake tests in staging/src/k8s.io/kube-aggregator/pkg/apiserver `waitForEmptyQueue` cannot guarantee that all items in the queue have been synced completely but guarantee that all items have been started. This adds `waitForQueueComplete` and implements `completerWorkqueue` to check if the workqueue is complete to deflake the tests in staging/src/k8s.io/kube-aggregator/pkg/apiserver. --- .../pkg/apiserver/handler_discovery_test.go | 116 +++++++++++++++--- 1 file changed, 101 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go index e4fff6cdcd9..a909efe1247 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go @@ -23,6 +23,7 @@ import ( "sort" "strconv" "strings" + "sync" "sync/atomic" "testing" "time" @@ -38,19 +39,22 @@ import ( discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" scheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" ) func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager { - return NewDiscoveryManager(rm).(*discoveryManager) + dm := NewDiscoveryManager(rm).(*discoveryManager) + dm.dirtyAPIServiceQueue = newCompleterWorkqueue(dm.dirtyAPIServiceQueue) + + return dm } -// Returns true if the queue of services to sync empty this means everything has -// been reconciled and placed into merged document -func waitForEmptyQueue(stopCh <-chan struct{}, dm *discoveryManager) bool { +// Returns true if the queue of services to sync is complete which means +// everything has been reconciled and placed into merged document +func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool { return cache.WaitForCacheSync(stopCh, func() bool { - // Once items have successfully synced they are removed from queue. - return dm.dirtyAPIServiceQueue.Len() == 0 + return dm.dirtyAPIServiceQueue.(*completerWorkqueue).isComplete() }) } @@ -104,7 +108,7 @@ func TestBasic(t *testing.T) { go aggregatedManager.Run(testCtx.Done()) - require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) response, _, parsed := fetchPath(aggregatedResourceManager, "") if response.StatusCode != 200 { @@ -160,7 +164,44 @@ func TestDirty(t *testing.T) { defer cancel() go aggregatedManager.Run(testCtx.Done()) - require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) + + // immediately check for ping, since Run() should block for local services + if !pinged.Load() { + t.Errorf("service handler never pinged") + } +} + +// Shows that waitForQueueComplete also waits for syncing to +// complete by artificially making the sync handler take a long time +func TestWaitForSync(t *testing.T) { + pinged := atomic.Bool{} + service := discoveryendpoint.NewResourceManager() + aggregatedResourceManager := discoveryendpoint.NewResourceManager() + + aggregatedManager := newDiscoveryManager(aggregatedResourceManager) + + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service", + }, + }, + }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(3 * time.Second) + pinged.Store(true) + service.ServeHTTP(w, r) + })) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go aggregatedManager.Run(testCtx.Done()) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) // immediately check for ping, since Run() should block for local services if !pinged.Load() { @@ -212,7 +253,7 @@ func TestRemoveAPIService(t *testing.T) { aggregatedManager.RemoveAPIService(s.Name) } - require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) response, _, parsed := fetchPath(aggyService, "") if response.StatusCode != 200 { @@ -356,7 +397,7 @@ func TestLegacyFallbackNoCache(t *testing.T) { defer cancel() go aggregatedManager.Run(testCtx.Done()) - require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) // At this point external services have synced. Check if discovery document // includes the legacy resources @@ -465,7 +506,7 @@ func TestLegacyFallback(t *testing.T) { defer cancel() go aggregatedManager.Run(testCtx.Done()) - require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) // At this point external services have synced. Check if discovery document // includes the legacy resources @@ -534,10 +575,10 @@ func TestNotModified(t *testing.T) { // Important to wait here to ensure we prime the cache with the initial list // of documents in order to exercise 304 Not Modified - require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) - // Now add all groups. We excluded one group before so that AllServicesSynced - // could include it in this round. Now, if AllServicesSynced ever returns + // Now add all groups. We excluded one group before so that waitForQueueIsComplete + // could include it in this round. Now, if waitForQueueIsComplete ever returns // true, it must have synced all the pre-existing groups before, which would // return 304 Not Modified for _, s := range apiServices { @@ -545,7 +586,7 @@ func TestNotModified(t *testing.T) { } // This would wait the full timeout on 1.26.0. - require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) } // copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go @@ -610,3 +651,48 @@ func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apid return w.Result(), bytes, decoded } + +// completerWorkqueue is a workqueue.RateLimitingInterface that implements +// isComplete +type completerWorkqueue struct { + lock sync.Mutex + workqueue.RateLimitingInterface + processing map[interface{}]struct{} +} + +var _ = workqueue.RateLimitingInterface(&completerWorkqueue{}) + +func newCompleterWorkqueue(wq workqueue.RateLimitingInterface) *completerWorkqueue { + return &completerWorkqueue{ + RateLimitingInterface: wq, + processing: make(map[interface{}]struct{}), + } +} + +func (q *completerWorkqueue) Add(item interface{}) { + q.lock.Lock() + defer q.lock.Unlock() + q.processing[item] = struct{}{} + q.RateLimitingInterface.Add(item) +} + +func (q *completerWorkqueue) AddAfter(item interface{}, duration time.Duration) { + q.Add(item) +} + +func (q *completerWorkqueue) AddRateLimited(item interface{}) { + q.Add(item) +} + +func (q *completerWorkqueue) Done(item interface{}) { + q.lock.Lock() + defer q.lock.Unlock() + delete(q.processing, item) + q.RateLimitingInterface.Done(item) +} + +func (q *completerWorkqueue) isComplete() bool { + q.lock.Lock() + defer q.lock.Unlock() + return q.Len() == 0 && len(q.processing) == 0 +}