mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 08:17:26 +00:00
Aggregated Discovery, mark stale before initial health check
This commit is contained in:
parent
5cef5f1a57
commit
6726e65619
@ -343,6 +343,42 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
|
||||||
|
s.discoveryAggregationController = NewDiscoveryManager(
|
||||||
|
// Use aggregator as the source name to avoid overwriting native/CRD
|
||||||
|
// groups
|
||||||
|
s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Setup discovery endpoint
|
||||||
|
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||||
|
// Discovery aggregation depends on the apiservice registration controller
|
||||||
|
// having the full list of APIServices already synced
|
||||||
|
select {
|
||||||
|
case <-context.StopCh:
|
||||||
|
return nil
|
||||||
|
// Context cancelled, should abort/clean goroutines
|
||||||
|
case <-apiServiceRegistrationControllerInitiated:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run discovery manager's worker to watch for new/removed/updated
|
||||||
|
// APIServices to the discovery document can be updated at runtime
|
||||||
|
// When discovery is ready, all APIServices will be present, with APIServices
|
||||||
|
// that have not successfully synced discovery to be present but marked as Stale.
|
||||||
|
discoverySyncedCh := make(chan struct{})
|
||||||
|
go s.discoveryAggregationController.Run(context.StopCh, discoverySyncedCh)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-context.StopCh:
|
||||||
|
return nil
|
||||||
|
// Context cancelled, should abort/clean goroutines
|
||||||
|
case <-discoverySyncedCh:
|
||||||
|
// API services successfully sync
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
|
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
|
||||||
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
|
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
|
||||||
// Spawn a goroutine in aggregator apiserver to update storage version for
|
// Spawn a goroutine in aggregator apiserver to update storage version for
|
||||||
@ -414,22 +450,6 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
|
|
||||||
s.discoveryAggregationController = NewDiscoveryManager(
|
|
||||||
// Use aggregator as the source name to avoid overwriting native/CRD
|
|
||||||
// groups
|
|
||||||
s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Setup discovery endpoint
|
|
||||||
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error {
|
|
||||||
// Run discovery manager's worker to watch for new/removed/updated
|
|
||||||
// APIServices to the discovery document can be updated at runtime
|
|
||||||
go s.discoveryAggregationController.Run(context.StopCh)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
prepared := s.GenericAPIServer.PrepareRun()
|
prepared := s.GenericAPIServer.PrepareRun()
|
||||||
|
|
||||||
// delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers
|
// delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers
|
||||||
|
@ -67,9 +67,9 @@ type DiscoveryAggregationController interface {
|
|||||||
// Thread-safe
|
// Thread-safe
|
||||||
RemoveAPIService(apiServiceName string)
|
RemoveAPIService(apiServiceName string)
|
||||||
|
|
||||||
// Spwans a worker which waits for added/updated apiservices and updates
|
// Spawns a worker which waits for added/updated apiservices and updates
|
||||||
// the unified discovery document by contacting the aggregated api services
|
// the unified discovery document by contacting the aggregated api services
|
||||||
Run(stopCh <-chan struct{})
|
Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
type discoveryManager struct {
|
type discoveryManager struct {
|
||||||
@ -406,14 +406,45 @@ func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spwans a goroutune which waits for added/updated apiservices and updates
|
func (dm *discoveryManager) getAPIServiceKeys() []string {
|
||||||
|
dm.servicesLock.RLock()
|
||||||
|
defer dm.servicesLock.RUnlock()
|
||||||
|
keys := []string{}
|
||||||
|
for key := range dm.apiServices {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawns a goroutine which waits for added/updated apiservices and updates
|
||||||
// the discovery document accordingly
|
// the discovery document accordingly
|
||||||
func (dm *discoveryManager) Run(stopCh <-chan struct{}) {
|
func (dm *discoveryManager) Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{}) {
|
||||||
klog.Info("Starting ResourceDiscoveryManager")
|
klog.Info("Starting ResourceDiscoveryManager")
|
||||||
|
|
||||||
// Shutdown the queue since stopCh was signalled
|
// Shutdown the queue since stopCh was signalled
|
||||||
defer dm.dirtyAPIServiceQueue.ShutDown()
|
defer dm.dirtyAPIServiceQueue.ShutDown()
|
||||||
|
|
||||||
|
// Ensure that apiregistration.k8s.io is the first group in the discovery group.
|
||||||
|
dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0)
|
||||||
|
|
||||||
|
// Ensure that all APIServices are present before readiness check succeeds
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
// Iterate on a copy of the keys to be thread safe with syncAPIService
|
||||||
|
keys := dm.getAPIServiceKeys()
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(k string) {
|
||||||
|
defer wg.Done()
|
||||||
|
// If an error was returned, the APIService will still have been
|
||||||
|
// added but marked as stale. Ignore the return value here
|
||||||
|
_ = dm.syncAPIService(k)
|
||||||
|
}(key)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
close(discoverySyncedCh)
|
||||||
|
|
||||||
// Spawn workers
|
// Spawn workers
|
||||||
// These workers wait for APIServices to be marked dirty.
|
// These workers wait for APIServices to be marked dirty.
|
||||||
// Worker ensures the cached discovery document hosted by the ServiceReference of
|
// Worker ensures the cached discovery document hosted by the ServiceReference of
|
||||||
@ -440,9 +471,6 @@ func (dm *discoveryManager) Run(stopCh <-chan struct{}) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that apiregistration.k8s.io is the first group in the discovery group.
|
|
||||||
dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0)
|
|
||||||
|
|
||||||
wait.PollUntil(1*time.Minute, func() (done bool, err error) {
|
wait.PollUntil(1*time.Minute, func() (done bool, err error) {
|
||||||
dm.servicesLock.Lock()
|
dm.servicesLock.Lock()
|
||||||
defer dm.servicesLock.Unlock()
|
defer dm.servicesLock.Unlock()
|
||||||
|
@ -61,6 +61,10 @@ func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fakeCh() chan struct{} {
|
||||||
|
return make(chan struct{})
|
||||||
|
}
|
||||||
|
|
||||||
// Test that the discovery manager starts and aggregates from two local API services
|
// Test that the discovery manager starts and aggregates from two local API services
|
||||||
func TestBasic(t *testing.T) {
|
func TestBasic(t *testing.T) {
|
||||||
service1 := discoveryendpoint.NewResourceManager("apis")
|
service1 := discoveryendpoint.NewResourceManager("apis")
|
||||||
@ -204,7 +208,7 @@ func TestBasic(t *testing.T) {
|
|||||||
testCtx, testCancel := context.WithCancel(context.Background())
|
testCtx, testCancel := context.WithCancel(context.Background())
|
||||||
defer testCancel()
|
defer testCancel()
|
||||||
|
|
||||||
go aggregatedManager.Run(testCtx.Done())
|
go aggregatedManager.Run(testCtx.Done(), fakeCh())
|
||||||
|
|
||||||
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
||||||
|
|
||||||
@ -240,6 +244,62 @@ func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestInitialRunHasAllAPIServices tests that when discovery is ready, all APIService
|
||||||
|
// are present and ones that have not synced are in the list as Stale.
|
||||||
|
func TestInitialRunHasAllAPIServices(t *testing.T) {
|
||||||
|
neverReturnCh := make(chan struct{})
|
||||||
|
defer close(neverReturnCh)
|
||||||
|
service := discoveryendpoint.NewResourceManager("apis")
|
||||||
|
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
|
||||||
|
|
||||||
|
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
|
||||||
|
|
||||||
|
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "v1.stable.example.com",
|
||||||
|
},
|
||||||
|
Spec: apiregistrationv1.APIServiceSpec{
|
||||||
|
Group: "stable.example.com",
|
||||||
|
Version: "v1",
|
||||||
|
Service: &apiregistrationv1.ServiceReference{
|
||||||
|
Name: "test-service",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
<-neverReturnCh
|
||||||
|
service.ServeHTTP(w, r)
|
||||||
|
}))
|
||||||
|
testCtx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
initialSyncedCh := fakeCh()
|
||||||
|
go aggregatedManager.Run(testCtx.Done(), initialSyncedCh)
|
||||||
|
select {
|
||||||
|
case <-initialSyncedCh:
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
t.Fatal("timed out waiting for initial sync")
|
||||||
|
}
|
||||||
|
|
||||||
|
response, _, parsed := fetchPath(aggregatedResourceManager, "")
|
||||||
|
if response.StatusCode != 200 {
|
||||||
|
t.Fatalf("unexpected status code %d", response.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
apiGroup := apidiscoveryv2beta1.APIGroupDiscoveryList{Items: []apidiscoveryv2beta1.APIGroupDiscovery{
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "stable.example.com"},
|
||||||
|
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
|
||||||
|
{
|
||||||
|
Version: "v1",
|
||||||
|
Freshness: "Stale",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
checkAPIGroups(t, apiGroup, parsed)
|
||||||
|
}
|
||||||
|
|
||||||
// Test that a handler associated with an APIService gets pinged after the
|
// Test that a handler associated with an APIService gets pinged after the
|
||||||
// APIService has been marked as dirty
|
// APIService has been marked as dirty
|
||||||
func TestDirty(t *testing.T) {
|
func TestDirty(t *testing.T) {
|
||||||
@ -267,7 +327,7 @@ func TestDirty(t *testing.T) {
|
|||||||
testCtx, cancel := context.WithCancel(context.Background())
|
testCtx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go aggregatedManager.Run(testCtx.Done())
|
go aggregatedManager.Run(testCtx.Done(), fakeCh())
|
||||||
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
||||||
|
|
||||||
// immediately check for ping, since Run() should block for local services
|
// immediately check for ping, since Run() should block for local services
|
||||||
@ -304,7 +364,7 @@ func TestWaitForSync(t *testing.T) {
|
|||||||
testCtx, cancel := context.WithCancel(context.Background())
|
testCtx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go aggregatedManager.Run(testCtx.Done())
|
go aggregatedManager.Run(testCtx.Done(), fakeCh())
|
||||||
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
||||||
|
|
||||||
// immediately check for ping, since Run() should block for local services
|
// immediately check for ping, since Run() should block for local services
|
||||||
@ -351,7 +411,7 @@ func TestRemoveAPIService(t *testing.T) {
|
|||||||
testCtx, testCancel := context.WithCancel(context.Background())
|
testCtx, testCancel := context.WithCancel(context.Background())
|
||||||
defer testCancel()
|
defer testCancel()
|
||||||
|
|
||||||
go aggregatedManager.Run(testCtx.Done())
|
go aggregatedManager.Run(testCtx.Done(), fakeCh())
|
||||||
|
|
||||||
for _, s := range apiServices {
|
for _, s := range apiServices {
|
||||||
aggregatedManager.RemoveAPIService(s.Name)
|
aggregatedManager.RemoveAPIService(s.Name)
|
||||||
@ -527,7 +587,7 @@ func TestLegacyFallbackNoCache(t *testing.T) {
|
|||||||
testCtx, cancel := context.WithCancel(context.Background())
|
testCtx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go aggregatedManager.Run(testCtx.Done())
|
go aggregatedManager.Run(testCtx.Done(), fakeCh())
|
||||||
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
||||||
|
|
||||||
// At this point external services have synced. Check if discovery document
|
// At this point external services have synced. Check if discovery document
|
||||||
@ -665,7 +725,7 @@ func testLegacyFallbackWithCustomRootHandler(t *testing.T, rootHandlerFn func(ht
|
|||||||
testCtx, cancel := context.WithCancel(context.Background())
|
testCtx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go aggregatedManager.Run(testCtx.Done())
|
go aggregatedManager.Run(testCtx.Done(), fakeCh())
|
||||||
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
||||||
|
|
||||||
// At this point external services have synced. Check if discovery document
|
// At this point external services have synced. Check if discovery document
|
||||||
@ -745,7 +805,7 @@ func TestAPIServiceStale(t *testing.T) {
|
|||||||
testCtx, cancel := context.WithCancel(context.Background())
|
testCtx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go aggregatedManager.Run(testCtx.Done())
|
go aggregatedManager.Run(testCtx.Done(), fakeCh())
|
||||||
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
||||||
|
|
||||||
// At this point external services have synced. Check if discovery document
|
// At this point external services have synced. Check if discovery document
|
||||||
@ -807,7 +867,7 @@ func TestNotModified(t *testing.T) {
|
|||||||
testCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
testCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go aggregatedManager.Run(testCtx.Done())
|
go aggregatedManager.Run(testCtx.Done(), fakeCh())
|
||||||
|
|
||||||
// Important to wait here to ensure we prime the cache with the initial list
|
// Important to wait here to ensure we prime the cache with the initial list
|
||||||
// of documents in order to exercise 304 Not Modified
|
// of documents in order to exercise 304 Not Modified
|
||||||
|
@ -121,6 +121,18 @@ var (
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
basicTestGroupStale = apidiscoveryv2beta1.APIGroupDiscovery{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "stable.example.com",
|
||||||
|
},
|
||||||
|
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
|
||||||
|
{
|
||||||
|
Version: "v1",
|
||||||
|
Freshness: apidiscoveryv2beta1.DiscoveryFreshnessStale,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
stableGroup = "stable.example.com"
|
stableGroup = "stable.example.com"
|
||||||
stableV1 = metav1.GroupVersion{Group: stableGroup, Version: "v1"}
|
stableV1 = metav1.GroupVersion{Group: stableGroup, Version: "v1"}
|
||||||
stableV1alpha1 = metav1.GroupVersion{Group: stableGroup, Version: "v1alpha1"}
|
stableV1alpha1 = metav1.GroupVersion{Group: stableGroup, Version: "v1alpha1"}
|
||||||
@ -172,6 +184,64 @@ func setup(t *testing.T) (context.Context, testClientSet, context.CancelFunc) {
|
|||||||
return ctx, client, cancelCtx
|
return ctx, client, cancelCtx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadinessAggregatedAPIServiceDiscovery(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
|
||||||
|
|
||||||
|
// Keep any goroutines spawned from running past the execution of this test
|
||||||
|
ctx, client, cleanup := setup(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Create a resource manager whichs serves our GroupVersion
|
||||||
|
resourceManager := discoveryendpoint.NewResourceManager("apis")
|
||||||
|
resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup})
|
||||||
|
|
||||||
|
apiServiceWaitCh := make(chan struct{})
|
||||||
|
|
||||||
|
// Install our ResourceManager as an Aggregated APIService to the
|
||||||
|
// test server
|
||||||
|
service := NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if strings.HasPrefix(r.URL.Path, "/apis/stable.example.com") {
|
||||||
|
// Return invalid response so APIService can be marked as "available"
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
} else if strings.HasPrefix(r.URL.Path, "/apis") {
|
||||||
|
select {
|
||||||
|
case <-apiServiceWaitCh:
|
||||||
|
// Hang responding to discovery until aggregated discovery document contains the aggregated group marked as Stale.
|
||||||
|
resourceManager.ServeHTTP(w, r)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// reject openapi/v2, openapi/v3, apis/<group>/<version>
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
go func() {
|
||||||
|
require.NoError(t, service.Run(ctx))
|
||||||
|
}()
|
||||||
|
require.NoError(t, service.WaitForReady(ctx))
|
||||||
|
|
||||||
|
// For each groupversion served by our resourcemanager, create an APIService
|
||||||
|
// object connected to our fake APIServer
|
||||||
|
for _, versionInfo := range basicTestGroup.Versions {
|
||||||
|
groupVersion := metav1.GroupVersion{
|
||||||
|
Group: basicTestGroup.Name,
|
||||||
|
Version: versionInfo.Version,
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, registerAPIService(ctx, client, groupVersion, service))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep repeatedly fetching document from aggregator.
|
||||||
|
// Check to see if it initially contains the aggregated group as stale
|
||||||
|
require.NoError(t, WaitForGroups(ctx, client, basicTestGroupStale))
|
||||||
|
require.NoError(t, WaitForRootPaths(t, ctx, client, sets.New("/apis/"+basicTestGroup.Name), nil))
|
||||||
|
|
||||||
|
// Allow the APIService to start responding and ensure that Freshness is updated when the APIService is reacheable.
|
||||||
|
close(apiServiceWaitCh)
|
||||||
|
require.NoError(t, WaitForGroups(ctx, client, basicTestGroupWithFixup))
|
||||||
|
}
|
||||||
|
|
||||||
func registerAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion, service FakeService) error {
|
func registerAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion, service FakeService) error {
|
||||||
port := service.Port()
|
port := service.Port()
|
||||||
if port == nil {
|
if port == nil {
|
||||||
|
@ -601,13 +601,18 @@ func WaitForRootPaths(t *testing.T, ctx context.Context, client testClient, requ
|
|||||||
func WaitForGroups(ctx context.Context, client testClient, groups ...apidiscoveryv2beta1.APIGroupDiscovery) error {
|
func WaitForGroups(ctx context.Context, client testClient, groups ...apidiscoveryv2beta1.APIGroupDiscovery) error {
|
||||||
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
|
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
|
||||||
for _, searchGroup := range groups {
|
for _, searchGroup := range groups {
|
||||||
|
found := false
|
||||||
for _, docGroup := range groupList.Items {
|
for _, docGroup := range groupList.Items {
|
||||||
if reflect.DeepEqual(searchGroup, docGroup) {
|
if reflect.DeepEqual(searchGroup, docGroup) {
|
||||||
return true
|
found = true
|
||||||
}
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !found {
|
||||||
return false
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user