diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index 88e4ed71fbd..c35ac49094c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -67,7 +67,9 @@ type openAPISpecInfo struct { // The downloader is used only for non-local apiservices to // re-update the spec every so often. - downloader cached.Value[*spec.Swagger] + // Calling Get() is not thread safe and should only be called by a single + // thread via the openapi controller. + downloader CacheableDownloader } type specAggregator struct { @@ -93,8 +95,7 @@ func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggr for i, handler := range delegationHandlers { name := fmt.Sprintf(localDelegateChainNamePattern, i+1) - spec := NewCacheableDownloader(downloader, handler) - spec = decorateError(name, spec) + spec := NewCacheableDownloader(name, downloader, handler) s.addLocalSpec(name, spec) } @@ -218,16 +219,21 @@ func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler s.mutex.Lock() defer s.mutex.Unlock() - _, exists := s.specsByAPIServiceName[apiService.Name] + existingSpec, exists := s.specsByAPIServiceName[apiService.Name] if !exists { - s.specsByAPIServiceName[apiService.Name] = &openAPISpecInfo{ + specInfo := &openAPISpecInfo{ apiService: *apiService, - downloader: decorateError(apiService.Name, NewCacheableDownloader(s.downloader, handler)), + downloader: NewCacheableDownloader(apiService.Name, s.downloader, handler), } + specInfo.spec.Store(cached.Result[*spec.Swagger]{Err: fmt.Errorf("spec for apiservice %s is not yet available", apiService.Name)}) + s.specsByAPIServiceName[apiService.Name] = specInfo s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) + } else { + existingSpec.apiService = *apiService + existingSpec.downloader.UpdateHandler(handler) } - return s.updateServiceLocked(apiService.Name) + return nil } // RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. @@ -243,14 +249,3 @@ func (s *specAggregator) RemoveAPIService(apiServiceName string) { // Re-create the mergeSpec for the new list of apiservices s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) } - -// decorateError creates a new cache that wraps a downloader -// cache the name of the apiservice to help with debugging. -func decorateError(name string, cache cached.Value[*spec.Swagger]) cached.Value[*spec.Swagger] { - return cached.Transform(func(result *spec.Swagger, etag string, err error) (*spec.Swagger, string, error) { - if err != nil { - return nil, "", fmt.Errorf("failed to download %v: %v", name, err) - } - return result, etag, err - }, cache) -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator_test.go index c5ad4e1598f..1b366e12de1 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator_test.go @@ -25,6 +25,7 @@ import ( "time" "bytes" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/validation/spec" @@ -90,6 +91,9 @@ func TestAddUpdateAPIService(t *testing.T) { if err := s.AddUpdateAPIService(apiService, handler); err != nil { t.Error(err) } + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } swagger, err := fetchOpenAPI(mux) if err != nil { @@ -109,7 +113,9 @@ func TestAddUpdateAPIService(t *testing.T) { }, }, } - s.UpdateAPIServiceSpec(apiService.Name) + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } swagger, err = fetchOpenAPI(mux) if err != nil { @@ -158,6 +164,9 @@ func TestAddRemoveAPIService(t *testing.T) { if err := s.AddUpdateAPIService(apiService, handler); err != nil { t.Error(err) } + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } swagger, err := fetchOpenAPI(mux) if err != nil { @@ -178,6 +187,78 @@ func TestAddRemoveAPIService(t *testing.T) { expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") } +func TestUpdateAPIService(t *testing.T) { + mux := http.NewServeMux() + var delegationHandlers []http.Handler + delegate1 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }} + delegationHandlers = append(delegationHandlers, delegate1) + + s := buildAndRegisterSpecAggregator(delegationHandlers, mux) + + apiService := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy"}, + }, + } + apiService.Name = "apiservice" + + handler := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/apiservicegroup/v1": {}, + }, + }, + }, + }} + + handler2 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{}, + }, + }, + }} + + if err := s.AddUpdateAPIService(apiService, handler); err != nil { + t.Error(err) + } + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/apiservicegroup/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") + + t.Logf("Updating APIService %s", apiService.Name) + if err := s.AddUpdateAPIService(apiService, handler2); err != nil { + t.Error(err) + } + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } + + swagger, err = fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + // Ensure that the if the APIService is added and then handler is modified, the new data is reflected in the aggregated OpenAPI. + expectNoPath(t, swagger, "/apis/apiservicegroup/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") +} + func TestFailingAPIServiceSkippedAggregation(t *testing.T) { mux := http.NewServeMux() var delegationHandlers []http.Handler @@ -233,8 +314,19 @@ func TestFailingAPIServiceSkippedAggregation(t *testing.T) { }, } - s.AddUpdateAPIService(apiServiceFailed, handlerFailed) - s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess) + if err := s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess); err != nil { + t.Error(err) + } + if err := s.AddUpdateAPIService(apiServiceFailed, handlerFailed); err != nil { + t.Error(err) + } + if err := s.UpdateAPIServiceSpec(apiServiceSuccess.Name); err != nil { + t.Error(err) + } + err := s.UpdateAPIServiceSpec(apiServiceFailed.Name) + if err == nil { + t.Errorf("Expected updating failing apiService %s to return error", apiServiceFailed.Name) + } swagger, err := fetchOpenAPI(mux) if err != nil { @@ -281,7 +373,12 @@ func TestAPIServiceFailSuccessTransition(t *testing.T) { }, } - s.AddUpdateAPIService(apiService, handler) + if err := s.AddUpdateAPIService(apiService, handler); err != nil { + t.Error(err) + } + if err := s.UpdateAPIServiceSpec(apiService.Name); err == nil { + t.Errorf("Expected error for when updating spec for failing apiservice") + } swagger, err := fetchOpenAPI(mux) if err != nil { @@ -304,12 +401,75 @@ func TestAPIServiceFailSuccessTransition(t *testing.T) { expectPath(t, swagger, "/apis/apiservicegroup/v1") } +func TestFailingAPIServiceDoesNotBlockAdd(t *testing.T) { + mux := http.NewServeMux() + var delegationHandlers []http.Handler + delegate1 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }} + delegationHandlers = append(delegationHandlers, delegate1) + + s := buildAndRegisterSpecAggregator(delegationHandlers, mux) + + apiServiceFailed := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy"}, + }, + } + apiServiceFailed.Name = "apiserviceFailed" + + // Create a handler that has a long response time and ensure that + // adding the APIService does not block. + handlerFailed := &openAPIHandler{ + delaySeconds: 5, + returnErr: true, + openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/failed/v1": {}, + }, + }, + }, + }, + } + + updateDone := make(chan bool) + go func() { + if err := s.AddUpdateAPIService(apiServiceFailed, handlerFailed); err != nil { + t.Error(err) + } + close(updateDone) + }() + + select { + case <-updateDone: + case <-time.After(2 * time.Second): + t.Errorf("AddUpdateAPIService affected by APIService response time") + } + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/foo/v1") + expectNoPath(t, swagger, "/apis/failed/v1") +} + type openAPIHandler struct { - openapi *spec.Swagger - returnErr bool + delaySeconds int + openapi *spec.Swagger + returnErr bool } func (o *openAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + time.Sleep(time.Duration(o.delaySeconds) * time.Second) if o.returnErr { w.WriteHeader(500) return diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go index c8898630e78..03721365805 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go @@ -21,32 +21,53 @@ import ( "fmt" "net/http" "strings" + "sync/atomic" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/kube-openapi/pkg/cached" "k8s.io/kube-openapi/pkg/validation/spec" ) +type CacheableDownloader interface { + UpdateHandler(http.Handler) + Get() (*spec.Swagger, string, error) +} + // cacheableDownloader is a downloader that will always return the data // and the etag. type cacheableDownloader struct { + name string downloader *Downloader - handler http.Handler - etag string - spec *spec.Swagger + // handler is the http Handler for the apiservice that can be replaced + handler atomic.Pointer[http.Handler] + etag string + spec *spec.Swagger } // NewCacheableDownloader creates a downloader that also returns the etag, making it useful to use as a cached dependency. -func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Value[*spec.Swagger] { - return &cacheableDownloader{ +func NewCacheableDownloader(apiServiceName string, downloader *Downloader, handler http.Handler) CacheableDownloader { + c := &cacheableDownloader{ + name: apiServiceName, downloader: downloader, - handler: handler, } + c.handler.Store(&handler) + return c +} +func (d *cacheableDownloader) UpdateHandler(handler http.Handler) { + d.handler.Store(&handler) } func (d *cacheableDownloader) Get() (*spec.Swagger, string, error) { - swagger, etag, status, err := d.downloader.Download(d.handler, d.etag) + spec, etag, err := d.get() + if err != nil { + return spec, etag, fmt.Errorf("failed to download %v: %v", d.name, err) + } + return spec, etag, err +} + +func (d *cacheableDownloader) get() (*spec.Swagger, string, error) { + h := *d.handler.Load() + swagger, etag, status, err := d.downloader.Download(h, d.etag) if err != nil { return nil, "", err } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go index 5f107150c8e..69f32f4aa86 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go @@ -139,7 +139,10 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService * // UpdateAPIService updates API Service's info and handler. func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) { - if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil { + if apiService.Spec.Service == nil { + return + } + if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(apiService.Name); err != nil { utilruntime.HandleError(fmt.Errorf("Error updating APIService %q with err: %v", apiService.Name, err)) } key := apiService.Name diff --git a/test/integration/apiserver/openapi/openapi_apiservice_test.go b/test/integration/apiserver/openapi/openapi_apiservice_test.go new file mode 100644 index 00000000000..01f6d20da55 --- /dev/null +++ b/test/integration/apiserver/openapi/openapi_apiservice_test.go @@ -0,0 +1,251 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapi + +import ( + "bytes" + "context" + "errors" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + kubernetes "k8s.io/client-go/kubernetes" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + "k8s.io/kube-openapi/pkg/validation/spec" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + testdiscovery "k8s.io/kubernetes/test/integration/apiserver/discovery" + "k8s.io/kubernetes/test/integration/framework" +) + +func TestSlowAPIServiceOpenAPIDoesNotBlockHealthCheck(t *testing.T) { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + etcd := framework.SharedEtcd() + setupServer := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, etcd) + client := generateTestClient(t, setupServer) + + service := testdiscovery.NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/openapi/v2" { + return + } + // Effectively let the APIService block until request timeout. + <-ctx.Done() + openapi := &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/wardle.example.com/v1alpha1": {}, + }, + }, + }, + } + data, err := openapi.MarshalJSON() + if err != nil { + t.Error(err) + } + http.ServeContent(w, r, "/openapi/v2", time.Now(), bytes.NewReader(data)) + })) + go func() { + require.NoError(t, service.Run(ctx)) + }() + require.NoError(t, service.WaitForReady(ctx)) + + groupVersion := metav1.GroupVersion{ + Group: "wardle.example.com", + Version: "v1alpha1", + } + + require.NoError(t, registerAPIService(ctx, client, groupVersion, service)) + + setupServer.TearDownFn() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, etcd) + t.Cleanup(server.TearDownFn) + client2 := generateTestClient(t, server) + + err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 1*time.Second, true, func(context.Context) (bool, error) { + var statusCode int + client2.AdmissionregistrationV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&statusCode) + if statusCode == 200 { + return true, nil + } + return false, nil + }) + require.NoError(t, err) +} + +func TestFetchingOpenAPIBeforeReady(t *testing.T) { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + t.Cleanup(server.TearDownFn) + client := generateTestClient(t, server) + + readyCh := make(chan bool) + defer close(readyCh) + go func() { + select { + case <-readyCh: + default: + _, _ = client.Discovery().RESTClient().Get().AbsPath("/openapi/v2").Do(context.TODO()).Raw() + } + }() + + service := testdiscovery.NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + openapi := &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/wardle.example.com/v1alpha1": {}, + }, + }, + }, + } + data, err := openapi.MarshalJSON() + if err != nil { + t.Error(err) + } + http.ServeContent(w, r, "/openapi/v2", time.Now(), bytes.NewReader(data)) + })) + go func() { + require.NoError(t, service.Run(ctx)) + }() + require.NoError(t, service.WaitForReady(ctx)) + + groupVersion := metav1.GroupVersion{ + Group: "wardle.example.com", + Version: "v1alpha1", + } + + require.NoError(t, registerAPIService(ctx, client, groupVersion, service)) + defer func() { + require.NoError(t, unregisterAPIService(ctx, client, groupVersion)) + }() + + err := wait.PollUntilContextTimeout(context.Background(), time.Millisecond*10, time.Second, true, func(context.Context) (bool, error) { + b, err := client.Discovery().RESTClient().Get().AbsPath("/openapi/v2").Do(context.TODO()).Raw() + require.NoError(t, err) + var openapi spec.Swagger + require.NoError(t, openapi.UnmarshalJSON(b)) + if _, ok := openapi.Paths.Paths["/apis/wardle.example.com/v1alpha1"]; ok { + return true, nil + } + return false, nil + }) + require.NoError(t, err) + +} + +// These definitions were copied from k8s.io/kubernetes/test/integation/apiserver/discovery +// and should be consolidated. +type kubeClientSet = kubernetes.Interface + +type aggegatorClientSet = aggregator.Interface + +type apiextensionsClientSet = apiextensions.Interface + +type dynamicClientset = dynamic.Interface + +type testClientSet struct { + kubeClientSet + aggegatorClientSet + apiextensionsClientSet + dynamicClientset +} + +type testClient interface { + kubernetes.Interface + aggregator.Interface + apiextensions.Interface + dynamic.Interface +} + +var _ testClient = testClientSet{} + +func (t testClientSet) Discovery() discovery.DiscoveryInterface { + return t.kubeClientSet.Discovery() +} + +func generateTestClient(t *testing.T, server *kubeapiservertesting.TestServer) testClient { + kubeClientSet, err := kubernetes.NewForConfig(server.ClientConfig) + require.NoError(t, err) + + aggegatorClientSet, err := aggregator.NewForConfig(server.ClientConfig) + require.NoError(t, err) + + apiextensionsClientSet, err := apiextensions.NewForConfig(server.ClientConfig) + require.NoError(t, err) + + dynamicClientset, err := dynamic.NewForConfig(server.ClientConfig) + require.NoError(t, err) + + client := testClientSet{ + kubeClientSet: kubeClientSet, + aggegatorClientSet: aggegatorClientSet, + apiextensionsClientSet: apiextensionsClientSet, + dynamicClientset: dynamicClientset, + } + return client +} + +func registerAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion, service testdiscovery.FakeService) error { + port := service.Port() + if port == nil { + return errors.New("service not yet started") + } + // Register the APIService + patch := apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: gv.Version + "." + gv.Group, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "APIService", + APIVersion: "apiregistration.k8s.io/v1", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: gv.Group, + Version: gv.Version, + InsecureSkipTLSVerify: true, + GroupPriorityMinimum: 1000, + VersionPriority: 15, + Service: &apiregistrationv1.ServiceReference{ + Namespace: "default", + Name: service.Name(), + Port: port, + }, + }, + } + + _, err := client. + ApiregistrationV1(). + APIServices(). + Create(context.TODO(), &patch, metav1.CreateOptions{FieldManager: "test-manager"}) + return err +} + +func unregisterAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion) error { + return client.ApiregistrationV1().APIServices().Delete(ctx, gv.Version+"."+gv.Group, metav1.DeleteOptions{}) +}