From aefaf66d2b45cecff522cdfeb7920a19c9014b52 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Tue, 21 Mar 2023 19:32:09 -0700 Subject: [PATCH] remove unused service keys from aggregated discovery --- .../pkg/apiserver/handler_discovery.go | 33 ++++++++++++ .../pkg/apiserver/handler_discovery_test.go | 52 +++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go index b6f07ff1459..953e93b754b 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints" @@ -489,6 +490,36 @@ func (dm *discoveryManager) Run(stopCh <-chan struct{}, discoverySyncedCh chan<- }, stopCh) } +// Takes a snapshot of all currently used services by known APIServices and +// purges the cache entries of those not present in the snapshot. +func (dm *discoveryManager) removeUnusedServices() { + usedServiceKeys := sets.Set[serviceKey]{} + + func() { + dm.servicesLock.Lock() + defer dm.servicesLock.Unlock() + + // Mark all non-local APIServices as dirty + for _, info := range dm.apiServices { + usedServiceKeys.Insert(info.service) + } + }() + + // Avoids double lock. It is okay if a service is added/removed between these + // functions. This is just a cache and that should be infrequent. + + func() { + dm.resultsLock.Lock() + defer dm.resultsLock.Unlock() + + for key := range dm.cachedResults { + if !usedServiceKeys.Has(key) { + delete(dm.cachedResults, key) + } + } + }() +} + // Adds an APIService to be tracked by the discovery manager. If the APIService // is already known func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) { @@ -506,12 +537,14 @@ func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIServi lastMarkedDirty: time.Now(), service: newServiceKey(*apiService.Spec.Service), }) + dm.removeUnusedServices() dm.dirtyAPIServiceQueue.Add(apiService.Name) } func (dm *discoveryManager) RemoveAPIService(apiServiceName string) { if dm.setInfoForAPIService(apiServiceName, nil) != nil { // mark dirty if there was actually something deleted + dm.removeUnusedServices() dm.dirtyAPIServiceQueue.Add(apiServiceName) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go index 544023ac91d..5f39805a551 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go @@ -296,6 +296,58 @@ func TestInitialRunHasAllAPIServices(t *testing.T) { checkAPIGroups(t, apiGroup, parsed) } +func TestServiceGC(t *testing.T) { + service := discoveryendpoint.NewResourceManager("apis") + + aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis") + aggregatedManager := newDiscoveryManager(aggregatedResourceManager) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go aggregatedManager.Run(testCtx.Done(), nil) + + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service", + }, + }, + }, service) + + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) + + // Lookup size of cache + getCacheLen := func() int { + aggregatedManager.resultsLock.Lock() + defer aggregatedManager.resultsLock.Unlock() + return len(aggregatedManager.cachedResults) + } + + require.Equal(t, 1, getCacheLen()) + + // Change the service of the same APIService a bit to create duplicate entry + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service-changed", + }, + }, + }, service) + + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) + require.Equal(t, 1, getCacheLen()) +} + // Test that a handler associated with an APIService gets pinged after the // APIService has been marked as dirty func TestDirty(t *testing.T) {