Merge pull request #115859 from gjkim42/deflake-TestDirty

Deflake tests in `staging/src/k8s.io/kube-aggregator/pkg/apiserver`
This commit is contained in:
Kubernetes Prow Robot 2023-03-01 10:09:19 -08:00 committed by GitHub
commit e2fff53f43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -23,6 +23,8 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -37,19 +39,22 @@ import (
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
scheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)
func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager {
return NewDiscoveryManager(rm).(*discoveryManager)
dm := NewDiscoveryManager(rm).(*discoveryManager)
dm.dirtyAPIServiceQueue = newCompleterWorkqueue(dm.dirtyAPIServiceQueue)
return dm
}
// Returns true if the queue of services to sync empty this means everything has
// been reconciled and placed into merged document
func waitForEmptyQueue(stopCh <-chan struct{}, dm *discoveryManager) bool {
// Returns true if the queue of services to sync is complete which means
// everything has been reconciled and placed into merged document
func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool {
return cache.WaitForCacheSync(stopCh, func() bool {
// Once items have successfully synced they are removed from queue.
return dm.dirtyAPIServiceQueue.Len() == 0
return dm.dirtyAPIServiceQueue.(*completerWorkqueue).isComplete()
})
}
@ -103,7 +108,7 @@ func TestBasic(t *testing.T) {
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
response, _, parsed := fetchPath(aggregatedResourceManager, "")
if response.StatusCode != 200 {
@ -134,7 +139,7 @@ func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList,
// Test that a handler associated with an APIService gets pinged after the
// APIService has been marked as dirty
func TestDirty(t *testing.T) {
pinged := false
var pinged atomic.Bool
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
@ -152,17 +157,54 @@ func TestDirty(t *testing.T) {
},
},
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pinged = true
pinged.Store(true)
service.ServeHTTP(w, r)
}))
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// immediately check for ping, since Run() should block for local services
if !pinged {
if !pinged.Load() {
t.Errorf("service handler never pinged")
}
}
// Shows that waitForQueueComplete also waits for syncing to
// complete by artificially making the sync handler take a long time
func TestWaitForSync(t *testing.T) {
pinged := atomic.Bool{}
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com",
},
Spec: apiregistrationv1.APIServiceSpec{
Group: "stable.example.com",
Version: "v1",
Service: &apiregistrationv1.ServiceReference{
Name: "test-service",
},
},
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(3 * time.Second)
pinged.Store(true)
service.ServeHTTP(w, r)
}))
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// immediately check for ping, since Run() should block for local services
if !pinged.Load() {
t.Errorf("service handler never pinged")
}
}
@ -211,7 +253,7 @@ func TestRemoveAPIService(t *testing.T) {
aggregatedManager.RemoveAPIService(s.Name)
}
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
response, _, parsed := fetchPath(aggyService, "")
if response.StatusCode != 200 {
@ -355,7 +397,7 @@ func TestLegacyFallbackNoCache(t *testing.T) {
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document
// includes the legacy resources
@ -464,7 +506,7 @@ func TestLegacyFallback(t *testing.T) {
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document
// includes the legacy resources
@ -533,10 +575,10 @@ func TestNotModified(t *testing.T) {
// Important to wait here to ensure we prime the cache with the initial list
// of documents in order to exercise 304 Not Modified
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// Now add all groups. We excluded one group before so that AllServicesSynced
// could include it in this round. Now, if AllServicesSynced ever returns
// Now add all groups. We excluded one group before so that waitForQueueIsComplete
// could include it in this round. Now, if waitForQueueIsComplete ever returns
// true, it must have synced all the pre-existing groups before, which would
// return 304 Not Modified
for _, s := range apiServices {
@ -544,7 +586,7 @@ func TestNotModified(t *testing.T) {
}
// This would wait the full timeout on 1.26.0.
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
}
// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go
@ -609,3 +651,48 @@ func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apid
return w.Result(), bytes, decoded
}
// completerWorkqueue is a workqueue.RateLimitingInterface that implements
// isComplete
type completerWorkqueue struct {
lock sync.Mutex
workqueue.RateLimitingInterface
processing map[interface{}]struct{}
}
var _ = workqueue.RateLimitingInterface(&completerWorkqueue{})
func newCompleterWorkqueue(wq workqueue.RateLimitingInterface) *completerWorkqueue {
return &completerWorkqueue{
RateLimitingInterface: wq,
processing: make(map[interface{}]struct{}),
}
}
func (q *completerWorkqueue) Add(item interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
q.processing[item] = struct{}{}
q.RateLimitingInterface.Add(item)
}
func (q *completerWorkqueue) AddAfter(item interface{}, duration time.Duration) {
q.Add(item)
}
func (q *completerWorkqueue) AddRateLimited(item interface{}) {
q.Add(item)
}
func (q *completerWorkqueue) Done(item interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
delete(q.processing, item)
q.RateLimitingInterface.Done(item)
}
func (q *completerWorkqueue) isComplete() bool {
q.lock.Lock()
defer q.lock.Unlock()
return q.Len() == 0 && len(q.processing) == 0
}