diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 47e3c19002e..328c3e23cec 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1173,6 +1173,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS genericfeatures.APIServerTracing: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.APIServingWithRoutine: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.ConsistentListFromCache: {Default: false, PreRelease: featuregate.Alpha}, genericfeatures.CustomResourceValidationExpressions: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.31 diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go index a8f2eee72b5..d928deed09c 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go @@ -41,6 +41,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" + genericfilters "k8s.io/apiserver/pkg/server/filters" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/tracing" "k8s.io/klog/v2" @@ -259,7 +260,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc } klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout) ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() + defer func() { cancel() }() watcher, err := rw.Watch(ctx, &opts) if err != nil { scope.err(err, w, req) @@ -270,11 +271,26 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc scope.err(err, w, req) return } - requestInfo, _ := request.RequestInfoFrom(ctx) - metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { - defer watcher.Stop() - handler.ServeHTTP(w, req) - }) + // Invalidate cancel() to defer until serve() is complete. + deferredCancel := cancel + cancel = func() {} + + serve := func() { + defer deferredCancel() + requestInfo, _ := request.RequestInfoFrom(ctx) + metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { + defer watcher.Stop() + handler.ServeHTTP(w, req) + }) + } + + // Run watch serving in a separate goroutine to allow freeing current stack memory + t := genericfilters.TaskFrom(req.Context()) + if t != nil { + t.Func = serve + } else { + serve() + } return } diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 6d484c0a8d6..b363cd2060b 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -90,6 +90,12 @@ const ( // Add support for distributed tracing in the API Server APIServerTracing featuregate.Feature = "APIServerTracing" + // owner: @linxiulei + // beta: v1.30 + // + // Enables serving watch requests in separate goroutines. + APIServingWithRoutine featuregate.Feature = "APIServingWithRoutine" + // owner: @cici37 @jpbetz // kep: http://kep.k8s.io/3488 // alpha: v1.26 @@ -284,6 +290,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS APIServerTracing: {Default: true, PreRelease: featuregate.Beta}, + APIServingWithRoutine: {Default: true, PreRelease: featuregate.Beta}, + ValidatingAdmissionPolicy: {Default: false, PreRelease: featuregate.Beta}, CustomResourceValidationExpressions: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.31 diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index a48bee2c939..81a7d6ddb3a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -1043,6 +1043,12 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithTracing(handler, c.TracerProvider) } handler = genericapifilters.WithLatencyTrackers(handler) + // WithRoutine will execute future handlers in a separate goroutine and serving + // handler in current goroutine to minimize the stack memory usage. It must be + // after WithPanicRecover() to be protected from panics. + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServingWithRoutine) { + handler = genericfilters.WithRoutine(handler, c.LongRunningFunc) + } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) handler = genericapifilters.WithRequestReceivedTimestamp(handler) handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled()) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/routine.go b/staging/src/k8s.io/apiserver/pkg/server/filters/routine.go new file mode 100644 index 00000000000..3f4dfa2bb22 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/routine.go @@ -0,0 +1,77 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "context" + "net/http" + + "k8s.io/apiserver/pkg/endpoints/request" +) + +type taskKeyType int + +const taskKey taskKeyType = iota + +type Task struct { + Func func() +} + +func WithTask(parent context.Context, t *Task) context.Context { + return request.WithValue(parent, taskKey, t) +} + +func TaskFrom(ctx context.Context) *Task { + t, _ := ctx.Value(taskKey).(*Task) + return t +} + +// WithRoutine returns an http.Handler that executes preparation of long running requests (i.e. watches) +// in a separate Goroutine and then serves the long running request in the main Goroutine. Doing so allows +// freeing stack memory used in preparation Goroutine for better memory efficiency. +func WithRoutine(handler http.Handler, longRunning request.LongRunningRequestCheck) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + requestInfo, _ := request.RequestInfoFrom(ctx) + if !longRunning(req, requestInfo) { + handler.ServeHTTP(w, req) + return + } + + req = req.WithContext(WithTask(ctx, &Task{})) + panicCh := make(chan any, 1) + go func() { + defer func() { + if r := recover(); r != nil { + panicCh <- r + } + close(panicCh) + }() + handler.ServeHTTP(w, req) + }() + + if p, ok := <-panicCh; ok { + panic(p) + } + + ctx = req.Context() + if t := TaskFrom(ctx); t != nil && t.Func != nil { + t.Func() + } + + }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/routine_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/routine_test.go new file mode 100644 index 00000000000..fbfce608069 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/routine_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "bytes" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog/v2" +) + +func TestPropogatingPanic(t *testing.T) { + var buf bytes.Buffer + klog.SetOutput(&buf) + klog.LogToStderr(false) + defer klog.LogToStderr(true) + + panicMsg := "panic as designed" + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + panic(panicMsg) + }) + resolver := &request.RequestInfoFactory{ + APIPrefixes: sets.NewString("api", "apis"), + GrouplessAPIPrefixes: sets.NewString("api"), + } + ts := httptest.NewServer(WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true })) + defer ts.Close() + _, err := http.Get(ts.URL) + if err == nil { + t.Error("expected to receive an error") + } + + klog.Flush() + klog.SetOutput(&bytes.Buffer{}) // prevent further writes into buf + capturedOutput := buf.String() + + if !strings.Contains(capturedOutput, panicMsg) || !strings.Contains(capturedOutput, "apiserver panic'd") { + t.Errorf("unexpected out captured actual = %v", capturedOutput) + } +} + +func TestExecutionWithRoutine(t *testing.T) { + var executed bool + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t := TaskFrom(r.Context()) + t.Func = func() { + executed = true + } + }) + ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true })) + defer ts.Close() + + _, err := http.Get(ts.URL) + if err != nil { + t.Errorf("got unexpected error on request: %v", err) + } + if !executed { + t.Error("expected to execute") + } +}