diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 4025281e1fc..540b011e476 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -146,6 +146,13 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(), autoRegistrationController) + // Imbue all builtin group-priorities onto the aggregated discovery + if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil { + for gv, entry := range apiVersionPriorities { + aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupPriority(gv.Group, int(entry.group)) + } + } + err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error { go crdRegistrationController.Run(5, context.StopCh) go func() { diff --git a/staging/src/k8s.io/kube-aggregator/go.mod b/staging/src/k8s.io/kube-aggregator/go.mod index aba67c07b58..e326b55dc6b 100644 --- a/staging/src/k8s.io/kube-aggregator/go.mod +++ b/staging/src/k8s.io/kube-aggregator/go.mod @@ -8,6 +8,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/emicklei/go-restful/v3 v3.9.0 github.com/gogo/protobuf v1.3.2 + github.com/google/gofuzz v1.1.0 github.com/spf13/cobra v1.6.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 @@ -44,7 +45,6 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/google/gofuzz v1.1.0 // indirect github.com/google/uuid v1.1.2 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index d60f8df9f66..4fd206cc89a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" @@ -154,6 +155,11 @@ type APIAggregator struct { // openAPIV3AggregationController downloads and caches OpenAPI v3 specs. openAPIV3AggregationController *openapiv3controller.AggregationController + // discoveryAggregationController downloads and caches discovery documents + // from all aggregated apiservices so they are available from /apis endpoint + // when discovery with resources are requested + discoveryAggregationController DiscoveryAggregationController + // egressSelector selects the proper egress dialer to communicate with the custom apiserver // overwrites proxyTransport dialer if not nil egressSelector *egressselector.EgressSelector @@ -244,7 +250,13 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg lister: s.lister, discoveryGroup: discoveryGroup(enabledVersions), } - s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler) + + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + apisHandlerWithAggregationSupport := aggregated.WrapAggregatedDiscoveryToHandler(apisHandler, s.GenericAPIServer.AggregatedDiscoveryGroupManager) + s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandlerWithAggregationSupport) + } else { + s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler) + } s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler) apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s) @@ -365,8 +377,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return s, nil } -// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec and calling -// the generic PrepareRun. +// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec & +// aggregated discovery document and calling the generic PrepareRun. func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { // add post start hook before generic PrepareRun in order to be before /healthz installation if s.openAPIConfig != nil { @@ -383,6 +395,20 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { }) } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + s.discoveryAggregationController = NewDiscoveryManager( + s.GenericAPIServer.AggregatedDiscoveryGroupManager, + ) + + // Setup discovery endpoint + s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error { + // Run discovery manager's worker to watch for new/removed/updated + // APIServices to the discovery document can be updated at runtime + go s.discoveryAggregationController.Run(context.StopCh) + return nil + }) + } + prepared := s.GenericAPIServer.PrepareRun() // delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers @@ -432,6 +458,12 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { if s.openAPIV3AggregationController != nil { s.openAPIV3AggregationController.UpdateAPIService(proxyHandler, apiService) } + // Forward calls to discovery manager to update discovery document + if s.discoveryAggregationController != nil { + handlerCopy := *proxyHandler + handlerCopy.setServiceAvailable(true) + s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy) + } return nil } @@ -457,6 +489,10 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { if s.openAPIV3AggregationController != nil { s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService) } + if s.discoveryAggregationController != nil { + s.discoveryAggregationController.AddAPIService(apiService, proxyHandler) + } + s.proxyHandlers[apiService.Name] = proxyHandler s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler) @@ -489,6 +525,11 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { // RemoveAPIService removes the APIService from being handled. It is not thread-safe, so only call it on one thread at a time please. // It's a slow moving API, so it's ok to run the controller on a single thread. func (s *APIAggregator) RemoveAPIService(apiServiceName string) { + // Forward calls to discovery manager to update discovery document + if s.discoveryAggregationController != nil { + s.discoveryAggregationController.RemoveAPIService(apiServiceName) + } + version := v1helper.APIServiceNameToGroupVersion(apiServiceName) proxyPath := "/apis/" + version.Group + "/" + version.Version diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go new file mode 100644 index 00000000000..41d6e0be9b0 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go @@ -0,0 +1,572 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "errors" + "fmt" + "net/http" + "sync" + "time" + + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints" + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" + "k8s.io/apiserver/pkg/endpoints/request" + scheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" +) + +var APIRegistrationGroup string = "apiregistration.k8s.io" +var APIRegistrationGroupPriority int = 18000 + +// Given a list of APIServices and proxyHandlers for contacting them, +// DiscoveryManager caches a list of discovery documents for each server + +type DiscoveryAggregationController interface { + // Adds or Updates an APIService from the Aggregated Discovery Controller's + // knowledge base + // Thread-safe + AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) + + // Removes an APIService from the Aggregated Discovery Controller's Knowledge + // bank + // Thread-safe + RemoveAPIService(apiServiceName string) + + // Spwans a worker which waits for added/updated apiservices and updates + // the unified discovery document by contacting the aggregated api services + Run(stopCh <-chan struct{}) + + // Returns true if all non-local APIServices that have been added + // are synced at least once to the discovery document + ExternalServicesSynced() bool +} + +type discoveryManager struct { + // Locks `services` + servicesLock sync.RWMutex + + // Map from APIService's name (or a unique string for local servers) + // to information about contacting that API Service + apiServices map[string]groupVersionInfo + + // Locks cachedResults + resultsLock sync.RWMutex + + // Map from APIService.Spec.Service to the previously fetched value + // (Note that many APIServices might use the same APIService.Spec.Service) + cachedResults map[serviceKey]cachedResult + + // Queue of dirty apiServiceKey which need to be refreshed + // It is important that the reconciler for this queue does not excessively + // contact the apiserver if a key was enqueued before the server was last + // contacted. + dirtyAPIServiceQueue workqueue.RateLimitingInterface + + // Merged handler which stores all known groupversions + mergedDiscoveryHandler discoveryendpoint.ResourceManager +} + +// Version of Service/Spec with relevant fields for use as a cache key +type serviceKey struct { + Namespace string + Name string + Port int32 +} + +// Human-readable String representation used for logs +func (s serviceKey) String() string { + return fmt.Sprintf("%v/%v:%v", s.Namespace, s.Name, s.Port) +} + +func newServiceKey(service apiregistrationv1.ServiceReference) serviceKey { + // Docs say. Defaults to 443 for compatibility reasons. + // BETA: Should this be a shared constant to avoid drifting with the + // implementation? + port := int32(443) + if service.Port != nil { + port = *service.Port + } + + return serviceKey{ + Name: service.Name, + Namespace: service.Namespace, + Port: port, + } +} + +type cachedResult struct { + // Currently cached discovery document for this service + // Map from group name to version name to + discovery map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery + + // ETag hash of the cached discoveryDocument + etag string + + // Guaranteed to be a time less than the time the server responded with the + // discovery data. + lastUpdated time.Time +} + +// Information about a specific APIService/GroupVersion +type groupVersionInfo struct { + // Date this APIService was marked dirty. + // Guaranteed to be a time greater than the most recent time the APIService + // was known to be modified. + // + // Used for request deduplication to ensure the data used to reconcile each + // apiservice was retrieved after the time of the APIService change: + // real_apiservice_change_time < groupVersionInfo.lastMarkedDirty < cachedResult.lastUpdated < real_document_fresh_time + // + // This ensures that if the apiservice was changed after the last cached entry + // was stored, the discovery document will always be re-fetched. + lastMarkedDirty time.Time + + // Last time sync function was run for this GV. + lastReconciled time.Time + + // ServiceReference of this GroupVersion. This identifies the Service which + // describes how to contact the server responsible for this GroupVersion. + service serviceKey + + // groupPriority describes the priority of the APIService for sorting + groupPriority int + + // Method for contacting the service + handler http.Handler +} + +var _ DiscoveryAggregationController = &discoveryManager{} + +func NewDiscoveryManager( + target discoveryendpoint.ResourceManager, +) DiscoveryAggregationController { + return &discoveryManager{ + mergedDiscoveryHandler: target, + apiServices: make(map[string]groupVersionInfo), + cachedResults: make(map[serviceKey]cachedResult), + dirtyAPIServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "discovery-manager"), + } +} + +// Returns discovery data for the given apiservice. +// Caches the result. +// Returns the cached result if it is retrieved after the apiservice was last +// marked dirty +// If there was an error in fetching, returns the stale cached result if it exists, +// and a non-nil error +// If the result is current, returns nil error and non-nil result +func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion, info groupVersionInfo) (*cachedResult, error) { + // Lookup last cached result for this apiservice's service. + cached, exists := dm.getCacheEntryForService(info.service) + + // If entry exists and was updated after the given time, just stop now + if exists && cached.lastUpdated.After(info.lastMarkedDirty) { + return &cached, nil + } + + // If we have a handler to contact the server for this APIService, and + // the cache entry is too old to use, refresh the cache entry now. + handler := http.TimeoutHandler(info.handler, 5*time.Second, "request timed out") + req, err := http.NewRequest("GET", "/apis", nil) + if err != nil { + // NewRequest should not fail, but if it does for some reason, + // log it and continue + return &cached, fmt.Errorf("failed to create http.Request: %v", err) + } + + // Apply aggregator user to request + req = req.WithContext( + request.WithUser( + req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator", Groups: []string{"system:masters"}})) + req = req.WithContext(request.WithRequestInfo(req.Context(), &request.RequestInfo{ + Path: req.URL.Path, + IsResourceRequest: false, + })) + req.Header.Add("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") + + if exists && len(cached.etag) > 0 { + req.Header.Add("If-None-Match", cached.etag) + } + + // Important that the time recorded in the data's "lastUpdated" is conservatively + // from BEFORE the request is dispatched so that lastUpdated can be used to + // de-duplicate requests. + now := time.Now() + writer := newInMemoryResponseWriter() + handler.ServeHTTP(writer, req) + + switch writer.respCode { + case http.StatusNotModified: + dm.resultsLock.Lock() + defer dm.resultsLock.Unlock() + + // Keep old entry, update timestamp + cached = cachedResult{ + discovery: cached.discovery, + etag: cached.etag, + lastUpdated: now, + } + + dm.setCacheEntryForService(info.service, cached) + return &cached, nil + case http.StatusNotFound: + // Discovery Document is not being served at all. + // Fall back to legacy discovery information + if len(gv.Version) == 0 { + return nil, errors.New("not found") + } + + var path string + if len(gv.Group) == 0 { + path = "/api/" + gv.Version + } else { + path = "/apis/" + gv.Group + "/" + gv.Version + } + + req, err := http.NewRequest("GET", path, nil) + if err != nil { + // NewRequest should not fail, but if it does for some reason, + // log it and continue + return nil, fmt.Errorf("failed to create http.Request: %v", err) + } + + // Apply aggregator user to request + req = req.WithContext( + request.WithUser( + req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator"})) + + // req.Header.Add("Accept", runtime.ContentTypeProtobuf) + req.Header.Add("Accept", runtime.ContentTypeJSON) + + if exists && len(cached.etag) > 0 { + req.Header.Add("If-None-Match", cached.etag) + } + + writer := newInMemoryResponseWriter() + handler.ServeHTTP(writer, req) + + if writer.respCode != http.StatusOK { + return nil, fmt.Errorf("failed to download discovery for %s: %v", path, writer.String()) + } + + parsed := &metav1.APIResourceList{} + if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil { + return nil, err + } + + // Create a discomap with single group-version + resources, err := endpoints.ConvertGroupVersionIntoToDiscovery(parsed.APIResources) + if err != nil { + return nil, err + } + + discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{ + // Convert old-style APIGroupList to new information + gv: { + Version: gv.Version, + Resources: resources, + }, + } + + cached = cachedResult{ + discovery: discoMap, + lastUpdated: now, + } + + // Save the resolve, because it is still useful in case other services + // are already marked dirty. THey can use it without making http request + dm.setCacheEntryForService(info.service, cached) + return &cached, nil + + case http.StatusOK: + parsed := &apidiscoveryv2beta1.APIGroupDiscoveryList{} + if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil { + return nil, err + } + klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String()) + + // Convert discovery info into a map for convenient lookup later + discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{} + for _, g := range parsed.Items { + for _, v := range g.Versions { + discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v + } + } + + // Save cached result + cached = cachedResult{ + discovery: discoMap, + etag: writer.Header().Get("Etag"), + lastUpdated: now, + } + dm.setCacheEntryForService(info.service, cached) + return &cached, nil + + default: + klog.Infof("DiscoveryManager: Failed to download discovery for %v: %v %s", + info.service.String(), writer.respCode, writer.data) + return nil, fmt.Errorf("service %s returned non-success response code: %v", + info.service.String(), writer.respCode) + } +} + +// Try to sync a single APIService. +func (dm *discoveryManager) syncAPIService(apiServiceName string) error { + info, exists := dm.getInfoForAPIService(apiServiceName) + + gv := helper.APIServiceNameToGroupVersion(apiServiceName) + mgv := metav1.GroupVersion{Group: gv.Group, Version: gv.Version} + + if !exists { + // apiservice was removed. remove it from merged discovery + dm.mergedDiscoveryHandler.RemoveGroupVersion(mgv) + return nil + } + + // Lookup last cached result for this apiservice's service. + now := time.Now() + cached, err := dm.fetchFreshDiscoveryForService(mgv, info) + + info.lastReconciled = now + dm.setInfoForAPIService(apiServiceName, &info) + + var entry apidiscoveryv2beta1.APIVersionDiscovery + + // Extract the APIService's specific resource information from the + // groupversion + if cached == nil { + // There was an error fetching discovery for this APIService, and + // there is nothing in the cache for this GV. + // + // Just use empty GV to mark that GV exists, but no resources. + // Also mark that it is stale to indicate the fetch failed + // TODO: Maybe also stick in a status for the version the error? + entry = apidiscoveryv2beta1.APIVersionDiscovery{ + Version: gv.Version, + } + } else { + // Find our specific groupversion within the discovery document + entry, exists = cached.discovery[mgv] + if exists { + // The stale/fresh entry has our GV, so we can include it in the doc + } else { + // Successfully fetched discovery information from the server, but + // the server did not include this groupversion? + entry = apidiscoveryv2beta1.APIVersionDiscovery{ + Version: gv.Version, + } + } + } + + // The entry's staleness depends upon if `fetchFreshDiscoveryForService` + // returned an error or not. + if err == nil { + entry.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessCurrent + } else { + entry.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale + } + + dm.mergedDiscoveryHandler.AddGroupVersion(gv.Group, entry) + return nil +} + +// Spwans a goroutune which waits for added/updated apiservices and updates +// the discovery document accordingly +func (dm *discoveryManager) Run(stopCh <-chan struct{}) { + klog.Info("Starting ResourceDiscoveryManager") + + // Shutdown the queue since stopCh was signalled + defer dm.dirtyAPIServiceQueue.ShutDown() + + // Spawn workers + // These workers wait for APIServices to be marked dirty. + // Worker ensures the cached discovery document hosted by the ServiceReference of + // the APIService is at least as fresh as the APIService, then includes the + // APIService's groupversion into the merged document + for i := 0; i < 2; i++ { + go func() { + for { + next, shutdown := dm.dirtyAPIServiceQueue.Get() + if shutdown { + return + } + + func() { + defer dm.dirtyAPIServiceQueue.Done(next) + + if err := dm.syncAPIService(next.(string)); err != nil { + dm.dirtyAPIServiceQueue.AddRateLimited(next) + } else { + dm.dirtyAPIServiceQueue.Forget(next) + } + }() + } + }() + } + + // Ensure that apiregistration.k8s.io is the first group in the discovery group. + dm.mergedDiscoveryHandler.SetGroupPriority(APIRegistrationGroup, APIRegistrationGroupPriority) + + wait.PollUntil(1*time.Minute, func() (done bool, err error) { + dm.servicesLock.Lock() + defer dm.servicesLock.Unlock() + + now := time.Now() + + // Mark all non-local APIServices as dirty + for key, info := range dm.apiServices { + info.lastMarkedDirty = now + dm.apiServices[key] = info + dm.dirtyAPIServiceQueue.Add(key) + } + return false, nil + }, stopCh) +} + +// Adds an APIService to be tracked by the discovery manager. If the APIService +// is already known +func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) { + // If service is nil then its information is contained by a local APIService + // which is has already been added to the manager. + if apiService.Spec.Service == nil { + return + } + + // Add or update APIService record and mark it as dirty + dm.setInfoForAPIService(apiService.Name, &groupVersionInfo{ + groupPriority: int(apiService.Spec.GroupPriorityMinimum), + handler: handler, + lastMarkedDirty: time.Now(), + service: newServiceKey(*apiService.Spec.Service), + }) + dm.dirtyAPIServiceQueue.Add(apiService.Name) +} + +func (dm *discoveryManager) RemoveAPIService(apiServiceName string) { + if dm.setInfoForAPIService(apiServiceName, nil) != nil { + // mark dirty if there was actually something deleted + dm.dirtyAPIServiceQueue.Add(apiServiceName) + } +} + +func (dm *discoveryManager) ExternalServicesSynced() bool { + dm.servicesLock.RLock() + defer dm.servicesLock.RUnlock() + for _, info := range dm.apiServices { + if info.lastReconciled.IsZero() { + return false + } + } + + return true +} + +// +// Lock-protected accessors +// + +func (dm *discoveryManager) getCacheEntryForService(key serviceKey) (cachedResult, bool) { + dm.resultsLock.RLock() + defer dm.resultsLock.RUnlock() + + result, ok := dm.cachedResults[key] + return result, ok +} + +func (dm *discoveryManager) setCacheEntryForService(key serviceKey, result cachedResult) { + dm.resultsLock.Lock() + defer dm.resultsLock.Unlock() + + dm.cachedResults[key] = result +} + +func (dm *discoveryManager) getInfoForAPIService(name string) (groupVersionInfo, bool) { + dm.servicesLock.RLock() + defer dm.servicesLock.RUnlock() + + result, ok := dm.apiServices[name] + return result, ok +} + +func (dm *discoveryManager) setInfoForAPIService(name string, result *groupVersionInfo) (oldValueIfExisted *groupVersionInfo) { + dm.servicesLock.Lock() + defer dm.servicesLock.Unlock() + + if oldValue, exists := dm.apiServices[name]; exists { + oldValueIfExisted = &oldValue + } + + if result != nil { + dm.apiServices[name] = *result + } else { + delete(dm.apiServices, name) + } + + return oldValueIfExisted +} + +// !TODO: This was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go +// which was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go +// so we should find a home for this +// inMemoryResponseWriter is a http.Writer that keep the response in memory. +type inMemoryResponseWriter struct { + writeHeaderCalled bool + header http.Header + respCode int + data []byte +} + +func newInMemoryResponseWriter() *inMemoryResponseWriter { + return &inMemoryResponseWriter{header: http.Header{}} +} + +func (r *inMemoryResponseWriter) Header() http.Header { + return r.header +} + +func (r *inMemoryResponseWriter) WriteHeader(code int) { + r.writeHeaderCalled = true + r.respCode = code +} + +func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { + if !r.writeHeaderCalled { + r.WriteHeader(http.StatusOK) + } + r.data = append(r.data, in...) + return len(in), nil +} + +func (r *inMemoryResponseWriter) String() string { + s := fmt.Sprintf("ResponseCode: %d", r.respCode) + if r.data != nil { + s += fmt.Sprintf(", Body: %s", string(r.data)) + } + if r.header != nil { + s += fmt.Sprintf(", Header: %s", r.header) + } + return s +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go new file mode 100644 index 00000000000..2b7a94f5c42 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go @@ -0,0 +1,359 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver_test + +import ( + "context" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + + fuzz "github.com/google/gofuzz" + "github.com/stretchr/testify/require" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints" + "k8s.io/apiserver/pkg/endpoints/discovery" + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" + scheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-aggregator/pkg/apiserver" +) + +// Test that the discovery manager starts and aggregates from two local API services +func TestBasic(t *testing.T) { + service1 := discoveryendpoint.NewResourceManager() + service2 := discoveryendpoint.NewResourceManager() + apiGroup1 := fuzzAPIGroups(2, 5, 25) + apiGroup2 := fuzzAPIGroups(2, 5, 50) + service1.SetGroups(apiGroup1.Items) + service2.SetGroups(apiGroup2.Items) + aggregatedResourceManager := discoveryendpoint.NewResourceManager() + aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + + for _, g := range apiGroup1.Items { + for _, v := range g.Versions { + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: v.Version + "." + g.Name, + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: g.Name, + Version: v.Version, + Service: &apiregistrationv1.ServiceReference{ + Name: "service1", + }, + }, + }, service1) + } + } + + for _, g := range apiGroup2.Items { + for _, v := range g.Versions { + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: v.Version + "." + g.Name, + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: g.Name, + Version: v.Version, + Service: &apiregistrationv1.ServiceReference{ + Name: "service2", + }, + }, + }, service2) + } + } + + testCtx, _ := context.WithCancel(context.Background()) + go aggregatedManager.Run(testCtx.Done()) + + cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + + response, _, parsed := fetchPath(aggregatedResourceManager, "") + if response.StatusCode != 200 { + t.Fatalf("unexpected status code %d", response.StatusCode) + } + checkAPIGroups(t, apiGroup1, parsed) + checkAPIGroups(t, apiGroup2, parsed) +} + +func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList, response *apidiscoveryv2beta1.APIGroupDiscoveryList) { + if len(response.Items) < len(api.Items) { + t.Errorf("expected to check for at least %d groups, only have %d groups in response", len(api.Items), len(response.Items)) + } + for _, knownGroup := range api.Items { + found := false + for _, possibleGroup := range response.Items { + if knownGroup.Name == possibleGroup.Name { + t.Logf("found %s", knownGroup.Name) + found = true + } + } + if found == false { + t.Errorf("could not find %s", knownGroup.Name) + } + } +} + +// Test that a handler associated with an APIService gets pinged after the +// APIService has been marked as dirty +func TestDirty(t *testing.T) { + pinged := false + service := discoveryendpoint.NewResourceManager() + aggregatedResourceManager := discoveryendpoint.NewResourceManager() + + aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service", + }, + }, + }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + pinged = true + service.ServeHTTP(w, r) + })) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go aggregatedManager.Run(testCtx.Done()) + cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + + // immediately check for ping, since Run() should block for local services + if !pinged { + t.Errorf("service handler never pinged") + } +} + +// Show that an APIService can be removed and that its group no longer remains +// if there are no versions +func TestRemoveAPIService(t *testing.T) { + aggyService := discoveryendpoint.NewResourceManager() + service := discoveryendpoint.NewResourceManager() + apiGroup := fuzzAPIGroups(2, 3, 10) + service.SetGroups(apiGroup.Items) + + var apiServices []*apiregistrationv1.APIService + for _, g := range apiGroup.Items { + for _, v := range g.Versions { + apiservice := &apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: v.Version + "." + g.Name, + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: g.Name, + Version: v.Version, + Service: &apiregistrationv1.ServiceReference{ + Namespace: "serviceNamespace", + Name: "serviceName", + }, + }, + } + + apiServices = append(apiServices, apiservice) + } + } + + aggregatedManager := apiserver.NewDiscoveryManager(aggyService) + + for _, s := range apiServices { + aggregatedManager.AddAPIService(s, service) + } + + testCtx, _ := context.WithCancel(context.Background()) + go aggregatedManager.Run(testCtx.Done()) + + for _, s := range apiServices { + aggregatedManager.RemoveAPIService(s.Name) + } + + cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + + response, _, parsed := fetchPath(aggyService, "") + if response.StatusCode != 200 { + t.Fatalf("unexpected status code %d", response.StatusCode) + } + if len(parsed.Items) > 0 { + t.Errorf("expected to find no groups after service deletion (got %d groups)", len(parsed.Items)) + } +} + +func TestLegacyFallback(t *testing.T) { + aggregatedResourceManager := discoveryendpoint.NewResourceManager() + + legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{ + Name: "stable.example.com", + PreferredVersion: metav1.GroupVersionForDiscovery{ + GroupVersion: "stable.example.com/v1", + Version: "v1", + }, + Versions: []metav1.GroupVersionForDiscovery{ + { + GroupVersion: "stable.example.com/v1", + Version: "v1", + }, + { + GroupVersion: "stable.example.com/v1beta1", + Version: "v1beta1", + }, + }, + }) + + resource := metav1.APIResource{ + Name: "foos", + SingularName: "foo", + Group: "stable.example.com", + Version: "v1", + Namespaced: false, + Kind: "Foo", + Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"}, + Categories: []string{"all"}, + } + + legacyResourceHandler := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{ + Group: "stable.example.com", + Version: "v1", + }, discovery.APIResourceListerFunc(func() []metav1.APIResource { + return []metav1.APIResource{ + resource, + } + })) + + aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service", + }, + }, + }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/apis/stable.example.com" { + legacyGroupHandler.ServeHTTP(w, r) + } else if r.URL.Path == "/apis/stable.example.com/v1" { + // defer to legacy discovery + legacyResourceHandler.ServeHTTP(w, r) + } else { + // Unknown url + w.WriteHeader(http.StatusNotFound) + } + })) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go aggregatedManager.Run(testCtx.Done()) + require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)) + + // At this point external services have synced. Check if discovery document + // includes the legacy resources + _, _, doc := fetchPath(aggregatedResourceManager, "") + + converted, err := endpoints.ConvertGroupVersionIntoToDiscovery([]metav1.APIResource{resource}) + require.NoError(t, err) + require.Equal(t, []apidiscoveryv2beta1.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: resource.Group, + }, + Versions: []apidiscoveryv2beta1.APIVersionDiscovery{ + { + Version: resource.Version, + Resources: converted, + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, + }, + }, + }, + }, doc.Items) +} + +// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go +func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv2beta1.APIGroupDiscoveryList { + fuzzer := fuzz.NewWithSeed(seed) + fuzzer.NumElements(atLeastNumGroups, maxNumGroups) + fuzzer.NilChance(0) + fuzzer.Funcs(func(o *apidiscoveryv2beta1.APIGroupDiscovery, c fuzz.Continue) { + c.FuzzNoCustom(o) + + // The ResourceManager will just not serve the grouop if its versions + // list is empty + atLeastOne := apidiscoveryv2beta1.APIVersionDiscovery{} + c.Fuzz(&atLeastOne) + o.Versions = append(o.Versions, atLeastOne) + + o.TypeMeta = metav1.TypeMeta{ + Kind: "APIGroupDiscovery", + APIVersion: "v1", + } + }) + + var apis []apidiscoveryv2beta1.APIGroupDiscovery + fuzzer.Fuzz(&apis) + + return apidiscoveryv2beta1.APIGroupDiscoveryList{ + TypeMeta: metav1.TypeMeta{ + Kind: "APIGroupDiscoveryList", + APIVersion: "v1", + }, + Items: apis, + } + +} + +// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go +func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apidiscoveryv2beta1.APIGroupDiscoveryList) { + // Expect json-formatted apis group list + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/apis", nil) + + // Ask for JSON response + req.Header.Set("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") + + if etag != "" { + // Quote provided etag if unquoted + quoted := etag + if !strings.HasPrefix(etag, "\"") { + quoted = strconv.Quote(etag) + } + req.Header.Set("If-None-Match", quoted) + } + + handler.ServeHTTP(w, req) + + bytes := w.Body.Bytes() + var decoded *apidiscoveryv2beta1.APIGroupDiscoveryList + if len(bytes) > 0 { + decoded = &apidiscoveryv2beta1.APIGroupDiscoveryList{} + runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), bytes, decoded) + } + + return w.Result(), bytes, decoded +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 72feef9ebe0..d1c6597c5ab 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -231,6 +231,14 @@ func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) { // these methods provide locked access to fields +// Sets serviceAvailable value on proxyHandler +// not thread safe +func (r *proxyHandler) setServiceAvailable(value bool) { + info := r.handlingInfo.Load().(proxyHandlingInfo) + info.serviceAvailable = true + r.handlingInfo.Store(info) +} + func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIService) { if apiService.Spec.Service == nil { r.handlingInfo.Store(proxyHandlingInfo{local: true})