From 721897871697db007c2439ac298c579c0f201388 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 24 Feb 2020 15:36:08 -0800 Subject: [PATCH] Add a generic filter that blocks certain write requests before StorageVersions are updated during apiserver bootstrap. Also add a poststarthook to the aggregator which updates the StorageVersions via the storageversion.Manager --- cmd/kube-apiserver/app/aggregator.go | 8 ++ .../pkg/endpoints/filters/storageversion.go | 80 +++++++++++++++++++ .../handlers/responsewriters/errors.go | 7 ++ .../src/k8s.io/apiserver/pkg/server/config.go | 19 ++++- .../pkg/apiserver/apiserver.go | 31 +++++++ 5 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 535cf517bfe..5e72a59f78a 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/features" + genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" genericoptions "k8s.io/apiserver/pkg/server/options" @@ -67,6 +68,13 @@ func createAggregatorConfig( genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} genericConfig.RESTOptionsGetter = nil + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + // Add StorageVersionPrecondition handler to aggregator-apiserver. + // The handler will block write requests to built-in resources until the + // target resources' storage versions are up-to-date. + genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition + } + // override genericConfig.AdmissionControl with kube-aggregator's scheme, // because aggregator apiserver should use its own scheme to convert its own resources. err := commandOptions.Admission.ApplyTo( diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go new file mode 100644 index 00000000000..9b164957c84 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go @@ -0,0 +1,80 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "errors" + "fmt" + "net/http" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/storageversion" + _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration + "k8s.io/klog/v2" +) + +// WithStorageVersionPrecondition checks if the storage version barrier has +// completed, if not, it only passes the following API requests: +// 1. non-resource requests, +// 2. read requests, +// 3. write requests to the storageversion API, +// 4. resources whose StorageVersion is not pending update, including non-persisted resources. +func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Manager) http.Handler { + if svm == nil { + // TODO(roycaihw): switch to warning after the feature graduate to beta/GA + klog.V(2).Infof("Storage Version barrier is disabled") + return handler + } + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if svm.Completed() { + handler.ServeHTTP(w, req) + return + } + ctx := req.Context() + requestInfo, found := request.RequestInfoFrom(ctx) + if !found { + responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context")) + return + } + // Allow non-resource requests + if !requestInfo.IsResourceRequest { + handler.ServeHTTP(w, req) + return + } + // Allow read requests + if requestInfo.Verb == "get" || requestInfo.Verb == "list" || requestInfo.Verb == "watch" { + handler.ServeHTTP(w, req) + return + } + // Allow writes to the storage version API + if requestInfo.APIGroup == "internal.apiserver.k8s.io" && requestInfo.Resource == "storageversions" { + handler.ServeHTTP(w, req) + return + } + // If the resource's StorageVersion is not in the to-be-updated list, let it pass. + // Non-persisted resources are not in the to-be-updated list, so they will pass. + gr := schema.GroupResource{requestInfo.APIGroup, requestInfo.Resource} + if !svm.PendingUpdate(gr) { + handler.ServeHTTP(w, req) + return + } + + responsewriters.ServiceUnavailabeError(w, req, errors.New(fmt.Sprintf("wait for storage version registration to complete for resource: %v, last seen error: %v", gr, svm.LastUpdateError(gr)))) + }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go index d13bee4d223..ea7387537f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go @@ -76,3 +76,10 @@ func InternalError(w http.ResponseWriter, req *http.Request, err error) { http.StatusInternalServerError) utilruntime.HandleError(err) } + +// ServiceUnavailabeError renders a simple internal error +func ServiceUnavailabeError(w http.ResponseWriter, req *http.Request, err error) { + http.Error(w, sanitizer.Replace(fmt.Sprintf("Service Unavailable: %q: %v", req.RequestURI, err)), + http.StatusServiceUnavailable) + utilruntime.HandleError(err) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 80be10c01fd..c14759f419d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -62,6 +62,7 @@ import ( "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" serverstore "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/storageversion" "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/informers" @@ -227,6 +228,9 @@ type Config struct { // APIServerID is the ID of this API server APIServerID string + + // StorageVersionManager holds the storage versions of the API resources installed by this server. + StorageVersionManager storageversion.Manager } type RecommendedConfig struct { @@ -331,8 +335,9 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Default to treating watch as a long-running operation // Generic API servers have no inherent long-running subresources - LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), - APIServerID: id, + LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), + APIServerID: id, + StorageVersionManager: storageversion.NewDefaultManager(), } } @@ -582,7 +587,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G maxRequestBodyBytes: c.MaxRequestBodyBytes, livezClock: clock.RealClock{}, - APIServerID: c.APIServerID, + + APIServerID: c.APIServerID, + StorageVersionManager: c.StorageVersionManager, } for { @@ -703,6 +710,12 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G return s, nil } +func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c *Config) http.Handler { + // WithStorageVersionPrecondition needs the WithRequestInfo to run first + handler := genericapifilters.WithStorageVersionPrecondition(apiHandler, c.StorageVersionManager) + return DefaultBuildHandlerChain(handler, c) +} + func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler := filterlatency.TrackCompleted(apiHandler) handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 1b22b24e274..30d187ec45d 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -24,9 +24,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" serverstorage "k8s.io/apiserver/pkg/server/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/pkg/version" openapicommon "k8s.io/kube-openapi/pkg/common" @@ -264,6 +267,34 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + // Spawn a goroutine in aggregator apiserver to update storage version for + // all built-in resources + s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(context genericapiserver.PostStartHookContext) error { + // Technically an apiserver only needs to update storage version once during bootstrap. + // Reconcile StorageVersion objects every 10 minutes will help in the case that the + // StorageVersion objects get accidentally modified/deleted by a different agent. In that + // case, the reconciliation ensures future storage migration still works. If nothing gets + // changed, the reconciliation update is a noop and gets short-circuited by the apiserver, + // therefore won't change the resource version and trigger storage migration. + go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) { + // All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver) + // share the same generic apiserver config. The same StorageVersion manager is used + // to register all built-in resources when the generic apiservers install APIs. + s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(context.LoopbackClientConfig, s.GenericAPIServer.APIServerID) + return false, nil + }, context.StopCh) + // Once the storage version updater finishes the first round of update, + // the PostStartHook will return to unblock /healthz. The handler chain + // won't block write requests anymore. Check every second since it's not + // expensive. + wait.PollImmediateUntil(1*time.Second, func() (bool, error) { + return s.GenericAPIServer.StorageVersionManager.Completed(), nil + }, context.StopCh) + return nil + }) + } + return s, nil }