Merge pull request #122883 from Jefftree/agg-discovery-block-stale

Aggregated Discovery, mark APIServices stale before initial health check
This commit is contained in:
Kubernetes Prow Robot 2024-02-21 12:02:41 -08:00 committed by GitHub
commit 3d3d705a6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 214 additions and 33 deletions

View File

@ -343,6 +343,42 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
s.discoveryAggregationController = NewDiscoveryManager(
// Use aggregator as the source name to avoid overwriting native/CRD
// groups
s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource),
)
// Setup discovery endpoint
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error {
// Discovery aggregation depends on the apiservice registration controller
// having the full list of APIServices already synced
select {
case <-context.StopCh:
return nil
// Context cancelled, should abort/clean goroutines
case <-apiServiceRegistrationControllerInitiated:
}
// Run discovery manager's worker to watch for new/removed/updated
// APIServices to the discovery document can be updated at runtime
// When discovery is ready, all APIServices will be present, with APIServices
// that have not successfully synced discovery to be present but marked as Stale.
discoverySyncedCh := make(chan struct{})
go s.discoveryAggregationController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
return nil
// Context cancelled, should abort/clean goroutines
case <-discoverySyncedCh:
// API services successfully sync
}
return nil
})
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
// Spawn a goroutine in aggregator apiserver to update storage version for
@ -414,22 +450,6 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
})
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
s.discoveryAggregationController = NewDiscoveryManager(
// Use aggregator as the source name to avoid overwriting native/CRD
// groups
s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource),
)
// Setup discovery endpoint
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error {
// Run discovery manager's worker to watch for new/removed/updated
// APIServices to the discovery document can be updated at runtime
go s.discoveryAggregationController.Run(context.StopCh)
return nil
})
}
prepared := s.GenericAPIServer.PrepareRun()
// delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers

View File

@ -67,9 +67,9 @@ type DiscoveryAggregationController interface {
// Thread-safe
RemoveAPIService(apiServiceName string)
// Spwans a worker which waits for added/updated apiservices and updates
// Spawns a worker which waits for added/updated apiservices and updates
// the unified discovery document by contacting the aggregated api services
Run(stopCh <-chan struct{})
Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{})
}
type discoveryManager struct {
@ -406,14 +406,47 @@ func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
return nil
}
// Spwans a goroutune which waits for added/updated apiservices and updates
func (dm *discoveryManager) getAPIServiceKeys() []string {
dm.servicesLock.RLock()
defer dm.servicesLock.RUnlock()
keys := []string{}
for key := range dm.apiServices {
keys = append(keys, key)
}
return keys
}
// Spawns a goroutine which waits for added/updated apiservices and updates
// the discovery document accordingly
func (dm *discoveryManager) Run(stopCh <-chan struct{}) {
func (dm *discoveryManager) Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{}) {
klog.Info("Starting ResourceDiscoveryManager")
// Shutdown the queue since stopCh was signalled
defer dm.dirtyAPIServiceQueue.ShutDown()
// Ensure that apiregistration.k8s.io is the first group in the discovery group.
dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0)
// Ensure that all APIServices are present before readiness check succeeds
var wg sync.WaitGroup
// Iterate on a copy of the keys to be thread safe with syncAPIService
keys := dm.getAPIServiceKeys()
for _, key := range keys {
wg.Add(1)
go func(k string) {
defer wg.Done()
// If an error was returned, the APIService will still have been
// added but marked as stale. Ignore the return value here
_ = dm.syncAPIService(k)
}(key)
}
wg.Wait()
if discoverySyncedCh != nil {
close(discoverySyncedCh)
}
// Spawn workers
// These workers wait for APIServices to be marked dirty.
// Worker ensures the cached discovery document hosted by the ServiceReference of
@ -440,9 +473,6 @@ func (dm *discoveryManager) Run(stopCh <-chan struct{}) {
}()
}
// Ensure that apiregistration.k8s.io is the first group in the discovery group.
dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0)
wait.PollUntil(1*time.Minute, func() (done bool, err error) {
dm.servicesLock.Lock()
defer dm.servicesLock.Unlock()

View File

@ -204,7 +204,7 @@ func TestBasic(t *testing.T) {
testCtx, testCancel := context.WithCancel(context.Background())
defer testCancel()
go aggregatedManager.Run(testCtx.Done())
go aggregatedManager.Run(testCtx.Done(), nil)
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
@ -240,6 +240,62 @@ func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList,
}
}
// TestInitialRunHasAllAPIServices tests that when discovery is ready, all APIService
// are present and ones that have not synced are in the list as Stale.
func TestInitialRunHasAllAPIServices(t *testing.T) {
neverReturnCh := make(chan struct{})
defer close(neverReturnCh)
service := discoveryendpoint.NewResourceManager("apis")
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com",
},
Spec: apiregistrationv1.APIServiceSpec{
Group: "stable.example.com",
Version: "v1",
Service: &apiregistrationv1.ServiceReference{
Name: "test-service",
},
},
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-neverReturnCh
service.ServeHTTP(w, r)
}))
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
initialSyncedCh := make(chan struct{})
go aggregatedManager.Run(testCtx.Done(), initialSyncedCh)
select {
case <-initialSyncedCh:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for initial sync")
}
response, _, parsed := fetchPath(aggregatedResourceManager, "")
if response.StatusCode != 200 {
t.Fatalf("unexpected status code %d", response.StatusCode)
}
apiGroup := apidiscoveryv2beta1.APIGroupDiscoveryList{Items: []apidiscoveryv2beta1.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{Name: "stable.example.com"},
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
{
Version: "v1",
Freshness: "Stale",
},
},
},
}}
checkAPIGroups(t, apiGroup, parsed)
}
// Test that a handler associated with an APIService gets pinged after the
// APIService has been marked as dirty
func TestDirty(t *testing.T) {
@ -267,7 +323,7 @@ func TestDirty(t *testing.T) {
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
go aggregatedManager.Run(testCtx.Done(), nil)
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// immediately check for ping, since Run() should block for local services
@ -304,7 +360,7 @@ func TestWaitForSync(t *testing.T) {
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
go aggregatedManager.Run(testCtx.Done(), nil)
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// immediately check for ping, since Run() should block for local services
@ -351,7 +407,7 @@ func TestRemoveAPIService(t *testing.T) {
testCtx, testCancel := context.WithCancel(context.Background())
defer testCancel()
go aggregatedManager.Run(testCtx.Done())
go aggregatedManager.Run(testCtx.Done(), nil)
for _, s := range apiServices {
aggregatedManager.RemoveAPIService(s.Name)
@ -527,7 +583,7 @@ func TestLegacyFallbackNoCache(t *testing.T) {
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
go aggregatedManager.Run(testCtx.Done(), nil)
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document
@ -665,7 +721,7 @@ func testLegacyFallbackWithCustomRootHandler(t *testing.T, rootHandlerFn func(ht
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
go aggregatedManager.Run(testCtx.Done(), nil)
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document
@ -745,7 +801,7 @@ func TestAPIServiceStale(t *testing.T) {
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
go aggregatedManager.Run(testCtx.Done(), nil)
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document
@ -807,7 +863,7 @@ func TestNotModified(t *testing.T) {
testCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go aggregatedManager.Run(testCtx.Done())
go aggregatedManager.Run(testCtx.Done(), nil)
// Important to wait here to ensure we prime the cache with the initial list
// of documents in order to exercise 304 Not Modified

View File

@ -121,6 +121,18 @@ var (
},
}
basicTestGroupStale = apidiscoveryv2beta1.APIGroupDiscovery{
ObjectMeta: metav1.ObjectMeta{
Name: "stable.example.com",
},
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
{
Version: "v1",
Freshness: apidiscoveryv2beta1.DiscoveryFreshnessStale,
},
},
}
stableGroup = "stable.example.com"
stableV1 = metav1.GroupVersion{Group: stableGroup, Version: "v1"}
stableV1alpha1 = metav1.GroupVersion{Group: stableGroup, Version: "v1alpha1"}
@ -172,6 +184,64 @@ func setup(t *testing.T) (context.Context, testClientSet, context.CancelFunc) {
return ctx, client, cancelCtx
}
func TestReadinessAggregatedAPIServiceDiscovery(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
// Keep any goroutines spawned from running past the execution of this test
ctx, client, cleanup := setup(t)
defer cleanup()
// Create a resource manager whichs serves our GroupVersion
resourceManager := discoveryendpoint.NewResourceManager("apis")
resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup})
apiServiceWaitCh := make(chan struct{})
// Install our ResourceManager as an Aggregated APIService to the
// test server
service := NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/apis/stable.example.com") {
// Return invalid response so APIService can be marked as "available"
w.WriteHeader(http.StatusOK)
} else if strings.HasPrefix(r.URL.Path, "/apis") {
select {
case <-apiServiceWaitCh:
// Hang responding to discovery until aggregated discovery document contains the aggregated group marked as Stale.
resourceManager.ServeHTTP(w, r)
case <-ctx.Done():
return
}
} else {
// reject openapi/v2, openapi/v3, apis/<group>/<version>
w.WriteHeader(http.StatusNotFound)
}
}))
go func() {
require.NoError(t, service.Run(ctx))
}()
require.NoError(t, service.WaitForReady(ctx))
// For each groupversion served by our resourcemanager, create an APIService
// object connected to our fake APIServer
for _, versionInfo := range basicTestGroup.Versions {
groupVersion := metav1.GroupVersion{
Group: basicTestGroup.Name,
Version: versionInfo.Version,
}
require.NoError(t, registerAPIService(ctx, client, groupVersion, service))
}
// Keep repeatedly fetching document from aggregator.
// Check to see if it initially contains the aggregated group as stale
require.NoError(t, WaitForGroups(ctx, client, basicTestGroupStale))
require.NoError(t, WaitForRootPaths(t, ctx, client, sets.New("/apis/"+basicTestGroup.Name), nil))
// Allow the APIService to start responding and ensure that Freshness is updated when the APIService is reacheable.
close(apiServiceWaitCh)
require.NoError(t, WaitForGroups(ctx, client, basicTestGroupWithFixup))
}
func registerAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion, service FakeService) error {
port := service.Port()
if port == nil {

View File

@ -601,13 +601,18 @@ func WaitForRootPaths(t *testing.T, ctx context.Context, client testClient, requ
func WaitForGroups(ctx context.Context, client testClient, groups ...apidiscoveryv2beta1.APIGroupDiscovery) error {
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, searchGroup := range groups {
found := false
for _, docGroup := range groupList.Items {
if reflect.DeepEqual(searchGroup, docGroup) {
return true
found = true
break
}
}
if !found {
return false
}
}
return false
return true
})
}