diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go index 860806e6899..408d05cdee8 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go @@ -61,14 +61,10 @@ type DiscoveryAggregationController interface { // Spwans a worker which waits for added/updated apiservices and updates // the unified discovery document by contacting the aggregated api services Run(stopCh <-chan struct{}) - - // Returns true if all non-local APIServices that have been added - // are synced at least once to the discovery document - ExternalServicesSynced() bool } type discoveryManager struct { - // Locks `services` + // Locks `apiServices` servicesLock sync.RWMutex // Map from APIService's name (or a unique string for local servers) @@ -147,9 +143,6 @@ type groupVersionInfo struct { // was stored, the discovery document will always be re-fetched. lastMarkedDirty time.Time - // Last time sync function was run for this GV. - lastReconciled time.Time - // ServiceReference of this GroupVersion. This identifies the Service which // describes how to contact the server responsible for this GroupVersion. service serviceKey @@ -350,12 +343,8 @@ func (dm *discoveryManager) syncAPIService(apiServiceName string) error { } // Lookup last cached result for this apiservice's service. - now := time.Now() cached, err := dm.fetchFreshDiscoveryForService(mgv, info) - info.lastReconciled = now - dm.setInfoForAPIService(apiServiceName, &info) - var entry apidiscoveryv2beta1.APIVersionDiscovery // Extract the APIService's specific resource information from the @@ -477,18 +466,6 @@ func (dm *discoveryManager) RemoveAPIService(apiServiceName string) { } } -func (dm *discoveryManager) ExternalServicesSynced() bool { - dm.servicesLock.RLock() - defer dm.servicesLock.RUnlock() - for _, info := range dm.apiServices { - if info.lastReconciled.IsZero() { - return false - } - } - - return true -} - // // Lock-protected accessors // diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go index a7ef545fa55..f60508d3d3f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver_test +package apiserver import ( "context" @@ -37,9 +37,21 @@ import ( scheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" - "k8s.io/kube-aggregator/pkg/apiserver" ) +func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager { + return NewDiscoveryManager(rm).(*discoveryManager) +} + +// Returns true if the queue of services to sync empty this means everything has +// been reconciled and placed into merged document +func waitForEmptyQueue(stopCh <-chan struct{}, dm *discoveryManager) bool { + return cache.WaitForCacheSync(stopCh, func() bool { + // Once items have successfully synced they are removed from queue. + return dm.dirtyAPIServiceQueue.Len() == 0 + }) +} + // Test that the discovery manager starts and aggregates from two local API services func TestBasic(t *testing.T) { service1 := discoveryendpoint.NewResourceManager() @@ -49,7 +61,7 @@ func TestBasic(t *testing.T) { service1.SetGroups(apiGroup1.Items) service2.SetGroups(apiGroup2.Items) aggregatedResourceManager := discoveryendpoint.NewResourceManager() - aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + aggregatedManager := newDiscoveryManager(aggregatedResourceManager) for _, g := range apiGroup1.Items { for _, v := range g.Versions { @@ -85,10 +97,12 @@ func TestBasic(t *testing.T) { } } - testCtx, _ := context.WithCancel(context.Background()) + testCtx, testCancel := context.WithCancel(context.Background()) + defer testCancel() + go aggregatedManager.Run(testCtx.Done()) - cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) response, _, parsed := fetchPath(aggregatedResourceManager, "") if response.StatusCode != 200 { @@ -123,7 +137,8 @@ func TestDirty(t *testing.T) { service := discoveryendpoint.NewResourceManager() aggregatedResourceManager := discoveryendpoint.NewResourceManager() - aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + aggregatedManager := newDiscoveryManager(aggregatedResourceManager) + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ ObjectMeta: metav1.ObjectMeta{ Name: "v1.stable.example.com", @@ -143,7 +158,7 @@ func TestDirty(t *testing.T) { defer cancel() go aggregatedManager.Run(testCtx.Done()) - cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) // immediately check for ping, since Run() should block for local services if !pinged { @@ -180,20 +195,22 @@ func TestRemoveAPIService(t *testing.T) { } } - aggregatedManager := apiserver.NewDiscoveryManager(aggyService) + aggregatedManager := newDiscoveryManager(aggyService) for _, s := range apiServices { aggregatedManager.AddAPIService(s, service) } - testCtx, _ := context.WithCancel(context.Background()) + testCtx, testCancel := context.WithCancel(context.Background()) + defer testCancel() + go aggregatedManager.Run(testCtx.Done()) for _, s := range apiServices { aggregatedManager.RemoveAPIService(s.Name) } - cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) response, _, parsed := fetchPath(aggyService, "") if response.StatusCode != 200 { @@ -246,7 +263,7 @@ func TestLegacyFallback(t *testing.T) { } })) - aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + aggregatedManager := newDiscoveryManager(aggregatedResourceManager) aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ ObjectMeta: metav1.ObjectMeta{ Name: "v1.stable.example.com", @@ -275,7 +292,7 @@ func TestLegacyFallback(t *testing.T) { defer cancel() go aggregatedManager.Run(testCtx.Done()) - require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)) + require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) // At this point external services have synced. Check if discovery document // includes the legacy resources @@ -329,8 +346,7 @@ func TestNotModified(t *testing.T) { } } - aggregatedManager := apiserver.NewDiscoveryManager(aggyService) - + aggregatedManager := newDiscoveryManager(aggyService) // Add all except the last group. // Ensure this is done BEFORE the call to run, so they are included in initial // list to keep test focused @@ -345,10 +361,10 @@ func TestNotModified(t *testing.T) { // Important to wait here to ensure we prime the cache with the initial list // of documents in order to exercise 304 Not Modified - require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)) + require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) - // Now add all groups. We excluded one group before so that ExternalServicesSynced - // could include it in this round. Now, if ExternalServicesSynced ever returns + // Now add all groups. We excluded one group before so that AllServicesSynced + // could include it in this round. Now, if AllServicesSynced ever returns // true, it must have synced all the pre-existing groups before, which would // return 304 Not Modified for _, s := range apiServices { @@ -356,7 +372,7 @@ func TestNotModified(t *testing.T) { } // This would wait the full timeout on 1.26.0. - require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)) + require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager)) } // copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go