diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index f0f2b7140b4..ff7421726d4 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -343,6 +343,42 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + s.discoveryAggregationController = NewDiscoveryManager( + // Use aggregator as the source name to avoid overwriting native/CRD + // groups + s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource), + ) + + // Setup discovery endpoint + s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error { + // Discovery aggregation depends on the apiservice registration controller + // having the full list of APIServices already synced + select { + case <-context.StopCh: + return nil + // Context cancelled, should abort/clean goroutines + case <-apiServiceRegistrationControllerInitiated: + } + + // Run discovery manager's worker to watch for new/removed/updated + // APIServices to the discovery document can be updated at runtime + // When discovery is ready, all APIServices will be present, with APIServices + // that have not successfully synced discovery to be present but marked as Stale. + discoverySyncedCh := make(chan struct{}) + go s.discoveryAggregationController.Run(context.StopCh, discoverySyncedCh) + + select { + case <-context.StopCh: + return nil + // Context cancelled, should abort/clean goroutines + case <-discoverySyncedCh: + // API services successfully sync + } + return nil + }) + } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { // Spawn a goroutine in aggregator apiserver to update storage version for @@ -414,22 +450,6 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { }) } - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { - s.discoveryAggregationController = NewDiscoveryManager( - // Use aggregator as the source name to avoid overwriting native/CRD - // groups - s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource), - ) - - // Setup discovery endpoint - s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error { - // Run discovery manager's worker to watch for new/removed/updated - // APIServices to the discovery document can be updated at runtime - go s.discoveryAggregationController.Run(context.StopCh) - return nil - }) - } - prepared := s.GenericAPIServer.PrepareRun() // delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go index c307c1f0ed5..f4d7350f3bf 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go @@ -67,9 +67,9 @@ type DiscoveryAggregationController interface { // Thread-safe RemoveAPIService(apiServiceName string) - // Spwans a worker which waits for added/updated apiservices and updates + // Spawns a worker which waits for added/updated apiservices and updates // the unified discovery document by contacting the aggregated api services - Run(stopCh <-chan struct{}) + Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{}) } type discoveryManager struct { @@ -406,14 +406,45 @@ func (dm *discoveryManager) syncAPIService(apiServiceName string) error { return nil } -// Spwans a goroutune which waits for added/updated apiservices and updates +func (dm *discoveryManager) getAPIServiceKeys() []string { + dm.servicesLock.RLock() + defer dm.servicesLock.RUnlock() + keys := []string{} + for key := range dm.apiServices { + keys = append(keys, key) + } + return keys +} + +// Spawns a goroutine which waits for added/updated apiservices and updates // the discovery document accordingly -func (dm *discoveryManager) Run(stopCh <-chan struct{}) { +func (dm *discoveryManager) Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{}) { klog.Info("Starting ResourceDiscoveryManager") // Shutdown the queue since stopCh was signalled defer dm.dirtyAPIServiceQueue.ShutDown() + // Ensure that apiregistration.k8s.io is the first group in the discovery group. + dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0) + + // Ensure that all APIServices are present before readiness check succeeds + var wg sync.WaitGroup + // Iterate on a copy of the keys to be thread safe with syncAPIService + keys := dm.getAPIServiceKeys() + + for _, key := range keys { + wg.Add(1) + go func(k string) { + defer wg.Done() + // If an error was returned, the APIService will still have been + // added but marked as stale. Ignore the return value here + _ = dm.syncAPIService(k) + }(key) + } + wg.Wait() + + close(discoverySyncedCh) + // Spawn workers // These workers wait for APIServices to be marked dirty. // Worker ensures the cached discovery document hosted by the ServiceReference of @@ -440,9 +471,6 @@ func (dm *discoveryManager) Run(stopCh <-chan struct{}) { }() } - // Ensure that apiregistration.k8s.io is the first group in the discovery group. - dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0) - wait.PollUntil(1*time.Minute, func() (done bool, err error) { dm.servicesLock.Lock() defer dm.servicesLock.Unlock() diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go index da61ab6ce85..a7b42aaec6c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go @@ -61,6 +61,10 @@ func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool { }) } +func fakeCh() chan struct{} { + return make(chan struct{}) +} + // Test that the discovery manager starts and aggregates from two local API services func TestBasic(t *testing.T) { service1 := discoveryendpoint.NewResourceManager("apis") @@ -204,7 +208,7 @@ func TestBasic(t *testing.T) { testCtx, testCancel := context.WithCancel(context.Background()) defer testCancel() - go aggregatedManager.Run(testCtx.Done()) + go aggregatedManager.Run(testCtx.Done(), fakeCh()) require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) @@ -240,6 +244,62 @@ func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList, } } +// TestInitialRunHasAllAPIServices tests that when discovery is ready, all APIService +// are present and ones that have not synced are in the list as Stale. +func TestInitialRunHasAllAPIServices(t *testing.T) { + neverReturnCh := make(chan struct{}) + defer close(neverReturnCh) + service := discoveryendpoint.NewResourceManager("apis") + aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis") + + aggregatedManager := newDiscoveryManager(aggregatedResourceManager) + + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service", + }, + }, + }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-neverReturnCh + service.ServeHTTP(w, r) + })) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + initialSyncedCh := fakeCh() + go aggregatedManager.Run(testCtx.Done(), initialSyncedCh) + select { + case <-initialSyncedCh: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for initial sync") + } + + response, _, parsed := fetchPath(aggregatedResourceManager, "") + if response.StatusCode != 200 { + t.Fatalf("unexpected status code %d", response.StatusCode) + } + + apiGroup := apidiscoveryv2beta1.APIGroupDiscoveryList{Items: []apidiscoveryv2beta1.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{Name: "stable.example.com"}, + Versions: []apidiscoveryv2beta1.APIVersionDiscovery{ + { + Version: "v1", + Freshness: "Stale", + }, + }, + }, + }} + + checkAPIGroups(t, apiGroup, parsed) +} + // Test that a handler associated with an APIService gets pinged after the // APIService has been marked as dirty func TestDirty(t *testing.T) { @@ -267,7 +327,7 @@ func TestDirty(t *testing.T) { testCtx, cancel := context.WithCancel(context.Background()) defer cancel() - go aggregatedManager.Run(testCtx.Done()) + go aggregatedManager.Run(testCtx.Done(), fakeCh()) require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) // immediately check for ping, since Run() should block for local services @@ -304,7 +364,7 @@ func TestWaitForSync(t *testing.T) { testCtx, cancel := context.WithCancel(context.Background()) defer cancel() - go aggregatedManager.Run(testCtx.Done()) + go aggregatedManager.Run(testCtx.Done(), fakeCh()) require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) // immediately check for ping, since Run() should block for local services @@ -351,7 +411,7 @@ func TestRemoveAPIService(t *testing.T) { testCtx, testCancel := context.WithCancel(context.Background()) defer testCancel() - go aggregatedManager.Run(testCtx.Done()) + go aggregatedManager.Run(testCtx.Done(), fakeCh()) for _, s := range apiServices { aggregatedManager.RemoveAPIService(s.Name) @@ -527,7 +587,7 @@ func TestLegacyFallbackNoCache(t *testing.T) { testCtx, cancel := context.WithCancel(context.Background()) defer cancel() - go aggregatedManager.Run(testCtx.Done()) + go aggregatedManager.Run(testCtx.Done(), fakeCh()) require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) // At this point external services have synced. Check if discovery document @@ -665,7 +725,7 @@ func testLegacyFallbackWithCustomRootHandler(t *testing.T, rootHandlerFn func(ht testCtx, cancel := context.WithCancel(context.Background()) defer cancel() - go aggregatedManager.Run(testCtx.Done()) + go aggregatedManager.Run(testCtx.Done(), fakeCh()) require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) // At this point external services have synced. Check if discovery document @@ -745,7 +805,7 @@ func TestAPIServiceStale(t *testing.T) { testCtx, cancel := context.WithCancel(context.Background()) defer cancel() - go aggregatedManager.Run(testCtx.Done()) + go aggregatedManager.Run(testCtx.Done(), fakeCh()) require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) // At this point external services have synced. Check if discovery document @@ -807,7 +867,7 @@ func TestNotModified(t *testing.T) { testCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - go aggregatedManager.Run(testCtx.Done()) + go aggregatedManager.Run(testCtx.Done(), fakeCh()) // Important to wait here to ensure we prime the cache with the initial list // of documents in order to exercise 304 Not Modified diff --git a/test/integration/apiserver/discovery/discovery_test.go b/test/integration/apiserver/discovery/discovery_test.go index 71307c1378f..03f79cf5343 100644 --- a/test/integration/apiserver/discovery/discovery_test.go +++ b/test/integration/apiserver/discovery/discovery_test.go @@ -121,6 +121,18 @@ var ( }, } + basicTestGroupStale = apidiscoveryv2beta1.APIGroupDiscovery{ + ObjectMeta: metav1.ObjectMeta{ + Name: "stable.example.com", + }, + Versions: []apidiscoveryv2beta1.APIVersionDiscovery{ + { + Version: "v1", + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessStale, + }, + }, + } + stableGroup = "stable.example.com" stableV1 = metav1.GroupVersion{Group: stableGroup, Version: "v1"} stableV1alpha1 = metav1.GroupVersion{Group: stableGroup, Version: "v1alpha1"} @@ -172,6 +184,64 @@ func setup(t *testing.T) (context.Context, testClientSet, context.CancelFunc) { return ctx, client, cancelCtx } +func TestReadinessAggregatedAPIServiceDiscovery(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)() + + // Keep any goroutines spawned from running past the execution of this test + ctx, client, cleanup := setup(t) + defer cleanup() + + // Create a resource manager whichs serves our GroupVersion + resourceManager := discoveryendpoint.NewResourceManager("apis") + resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup}) + + apiServiceWaitCh := make(chan struct{}) + + // Install our ResourceManager as an Aggregated APIService to the + // test server + service := NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasPrefix(r.URL.Path, "/apis/stable.example.com") { + // Return invalid response so APIService can be marked as "available" + w.WriteHeader(http.StatusOK) + } else if strings.HasPrefix(r.URL.Path, "/apis") { + select { + case <-apiServiceWaitCh: + // Hang responding to discovery until aggregated discovery document contains the aggregated group marked as Stale. + resourceManager.ServeHTTP(w, r) + case <-ctx.Done(): + return + } + } else { + // reject openapi/v2, openapi/v3, apis// + w.WriteHeader(http.StatusNotFound) + } + })) + go func() { + require.NoError(t, service.Run(ctx)) + }() + require.NoError(t, service.WaitForReady(ctx)) + + // For each groupversion served by our resourcemanager, create an APIService + // object connected to our fake APIServer + for _, versionInfo := range basicTestGroup.Versions { + groupVersion := metav1.GroupVersion{ + Group: basicTestGroup.Name, + Version: versionInfo.Version, + } + + require.NoError(t, registerAPIService(ctx, client, groupVersion, service)) + } + + // Keep repeatedly fetching document from aggregator. + // Check to see if it initially contains the aggregated group as stale + require.NoError(t, WaitForGroups(ctx, client, basicTestGroupStale)) + require.NoError(t, WaitForRootPaths(t, ctx, client, sets.New("/apis/"+basicTestGroup.Name), nil)) + + // Allow the APIService to start responding and ensure that Freshness is updated when the APIService is reacheable. + close(apiServiceWaitCh) + require.NoError(t, WaitForGroups(ctx, client, basicTestGroupWithFixup)) +} + func registerAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion, service FakeService) error { port := service.Port() if port == nil { diff --git a/test/integration/apiserver/discovery/framework.go b/test/integration/apiserver/discovery/framework.go index f0642330db9..f47f3771be4 100644 --- a/test/integration/apiserver/discovery/framework.go +++ b/test/integration/apiserver/discovery/framework.go @@ -601,13 +601,18 @@ func WaitForRootPaths(t *testing.T, ctx context.Context, client testClient, requ func WaitForGroups(ctx context.Context, client testClient, groups ...apidiscoveryv2beta1.APIGroupDiscovery) error { return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool { for _, searchGroup := range groups { + found := false for _, docGroup := range groupList.Items { if reflect.DeepEqual(searchGroup, docGroup) { - return true + found = true + break } } + if !found { + return false + } } - return false + return true }) }