Deflake tests in staging/src/k8s.io/kube-aggregator/pkg/apiserver

`waitForEmptyQueue` cannot guarantee that all items in the queue have
been synced completely but guarantee that all items have been started.

This adds `waitForQueueComplete` and implements `completerWorkqueue` to
check if the workqueue is complete to deflake the tests in
staging/src/k8s.io/kube-aggregator/pkg/apiserver.
This commit is contained in:
Gunju Kim 2023-02-18 03:04:59 +09:00
parent e7d8dfb5a0
commit e24e3de379
No known key found for this signature in database
GPG Key ID: 9300A528F3F0DAB7

View File

@ -23,6 +23,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -38,19 +39,22 @@ import (
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
scheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)
func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager {
return NewDiscoveryManager(rm).(*discoveryManager)
dm := NewDiscoveryManager(rm).(*discoveryManager)
dm.dirtyAPIServiceQueue = newCompleterWorkqueue(dm.dirtyAPIServiceQueue)
return dm
}
// Returns true if the queue of services to sync empty this means everything has
// been reconciled and placed into merged document
func waitForEmptyQueue(stopCh <-chan struct{}, dm *discoveryManager) bool {
// Returns true if the queue of services to sync is complete which means
// everything has been reconciled and placed into merged document
func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool {
return cache.WaitForCacheSync(stopCh, func() bool {
// Once items have successfully synced they are removed from queue.
return dm.dirtyAPIServiceQueue.Len() == 0
return dm.dirtyAPIServiceQueue.(*completerWorkqueue).isComplete()
})
}
@ -104,7 +108,7 @@ func TestBasic(t *testing.T) {
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
response, _, parsed := fetchPath(aggregatedResourceManager, "")
if response.StatusCode != 200 {
@ -160,7 +164,44 @@ func TestDirty(t *testing.T) {
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// immediately check for ping, since Run() should block for local services
if !pinged.Load() {
t.Errorf("service handler never pinged")
}
}
// Shows that waitForQueueComplete also waits for syncing to
// complete by artificially making the sync handler take a long time
func TestWaitForSync(t *testing.T) {
pinged := atomic.Bool{}
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com",
},
Spec: apiregistrationv1.APIServiceSpec{
Group: "stable.example.com",
Version: "v1",
Service: &apiregistrationv1.ServiceReference{
Name: "test-service",
},
},
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(3 * time.Second)
pinged.Store(true)
service.ServeHTTP(w, r)
}))
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// immediately check for ping, since Run() should block for local services
if !pinged.Load() {
@ -212,7 +253,7 @@ func TestRemoveAPIService(t *testing.T) {
aggregatedManager.RemoveAPIService(s.Name)
}
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
response, _, parsed := fetchPath(aggyService, "")
if response.StatusCode != 200 {
@ -356,7 +397,7 @@ func TestLegacyFallbackNoCache(t *testing.T) {
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document
// includes the legacy resources
@ -465,7 +506,7 @@ func TestLegacyFallback(t *testing.T) {
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document
// includes the legacy resources
@ -534,10 +575,10 @@ func TestNotModified(t *testing.T) {
// Important to wait here to ensure we prime the cache with the initial list
// of documents in order to exercise 304 Not Modified
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// Now add all groups. We excluded one group before so that AllServicesSynced
// could include it in this round. Now, if AllServicesSynced ever returns
// Now add all groups. We excluded one group before so that waitForQueueIsComplete
// could include it in this round. Now, if waitForQueueIsComplete ever returns
// true, it must have synced all the pre-existing groups before, which would
// return 304 Not Modified
for _, s := range apiServices {
@ -545,7 +586,7 @@ func TestNotModified(t *testing.T) {
}
// This would wait the full timeout on 1.26.0.
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
}
// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go
@ -610,3 +651,48 @@ func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apid
return w.Result(), bytes, decoded
}
// completerWorkqueue is a workqueue.RateLimitingInterface that implements
// isComplete
type completerWorkqueue struct {
lock sync.Mutex
workqueue.RateLimitingInterface
processing map[interface{}]struct{}
}
var _ = workqueue.RateLimitingInterface(&completerWorkqueue{})
func newCompleterWorkqueue(wq workqueue.RateLimitingInterface) *completerWorkqueue {
return &completerWorkqueue{
RateLimitingInterface: wq,
processing: make(map[interface{}]struct{}),
}
}
func (q *completerWorkqueue) Add(item interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
q.processing[item] = struct{}{}
q.RateLimitingInterface.Add(item)
}
func (q *completerWorkqueue) AddAfter(item interface{}, duration time.Duration) {
q.Add(item)
}
func (q *completerWorkqueue) AddRateLimited(item interface{}) {
q.Add(item)
}
func (q *completerWorkqueue) Done(item interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
delete(q.processing, item)
q.RateLimitingInterface.Done(item)
}
func (q *completerWorkqueue) isComplete() bool {
q.lock.Lock()
defer q.lock.Unlock()
return q.Len() == 0 && len(q.processing) == 0
}