mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-19 16:49:35 +00:00
Merge pull request #120902 from linxiulei/watch_stack
Add handler to run execution in separate goroutine
This commit is contained in:
commit
6c04679617
@ -1173,6 +1173,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
genericfeatures.APIServerTracing: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
genericfeatures.APIServingWithRoutine: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
genericfeatures.ConsistentListFromCache: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
genericfeatures.CustomResourceValidationExpressions: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.31
|
||||
|
@ -41,6 +41,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/tracing"
|
||||
"k8s.io/klog/v2"
|
||||
@ -259,7 +260,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
|
||||
}
|
||||
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
defer func() { cancel() }()
|
||||
watcher, err := rw.Watch(ctx, &opts)
|
||||
if err != nil {
|
||||
scope.err(err, w, req)
|
||||
@ -270,11 +271,26 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
|
||||
scope.err(err, w, req)
|
||||
return
|
||||
}
|
||||
// Invalidate cancel() to defer until serve() is complete.
|
||||
deferredCancel := cancel
|
||||
cancel = func() {}
|
||||
|
||||
serve := func() {
|
||||
defer deferredCancel()
|
||||
requestInfo, _ := request.RequestInfoFrom(ctx)
|
||||
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
|
||||
defer watcher.Stop()
|
||||
handler.ServeHTTP(w, req)
|
||||
})
|
||||
}
|
||||
|
||||
// Run watch serving in a separate goroutine to allow freeing current stack memory
|
||||
t := genericfilters.TaskFrom(req.Context())
|
||||
if t != nil {
|
||||
t.Func = serve
|
||||
} else {
|
||||
serve()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -90,6 +90,12 @@ const (
|
||||
// Add support for distributed tracing in the API Server
|
||||
APIServerTracing featuregate.Feature = "APIServerTracing"
|
||||
|
||||
// owner: @linxiulei
|
||||
// beta: v1.30
|
||||
//
|
||||
// Enables serving watch requests in separate goroutines.
|
||||
APIServingWithRoutine featuregate.Feature = "APIServingWithRoutine"
|
||||
|
||||
// owner: @cici37 @jpbetz
|
||||
// kep: http://kep.k8s.io/3488
|
||||
// alpha: v1.26
|
||||
@ -284,6 +290,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
APIServerTracing: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
APIServingWithRoutine: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
ValidatingAdmissionPolicy: {Default: false, PreRelease: featuregate.Beta},
|
||||
|
||||
CustomResourceValidationExpressions: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.31
|
||||
|
@ -1043,6 +1043,12 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
||||
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
|
||||
}
|
||||
handler = genericapifilters.WithLatencyTrackers(handler)
|
||||
// WithRoutine will execute future handlers in a separate goroutine and serving
|
||||
// handler in current goroutine to minimize the stack memory usage. It must be
|
||||
// after WithPanicRecover() to be protected from panics.
|
||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServingWithRoutine) {
|
||||
handler = genericfilters.WithRoutine(handler, c.LongRunningFunc)
|
||||
}
|
||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
||||
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
||||
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
|
||||
|
77
staging/src/k8s.io/apiserver/pkg/server/filters/routine.go
Normal file
77
staging/src/k8s.io/apiserver/pkg/server/filters/routine.go
Normal file
@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package filters
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
type taskKeyType int
|
||||
|
||||
const taskKey taskKeyType = iota
|
||||
|
||||
type Task struct {
|
||||
Func func()
|
||||
}
|
||||
|
||||
func WithTask(parent context.Context, t *Task) context.Context {
|
||||
return request.WithValue(parent, taskKey, t)
|
||||
}
|
||||
|
||||
func TaskFrom(ctx context.Context) *Task {
|
||||
t, _ := ctx.Value(taskKey).(*Task)
|
||||
return t
|
||||
}
|
||||
|
||||
// WithRoutine returns an http.Handler that executes preparation of long running requests (i.e. watches)
|
||||
// in a separate Goroutine and then serves the long running request in the main Goroutine. Doing so allows
|
||||
// freeing stack memory used in preparation Goroutine for better memory efficiency.
|
||||
func WithRoutine(handler http.Handler, longRunning request.LongRunningRequestCheck) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
ctx := req.Context()
|
||||
requestInfo, _ := request.RequestInfoFrom(ctx)
|
||||
if !longRunning(req, requestInfo) {
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
req = req.WithContext(WithTask(ctx, &Task{}))
|
||||
panicCh := make(chan any, 1)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
panicCh <- r
|
||||
}
|
||||
close(panicCh)
|
||||
}()
|
||||
handler.ServeHTTP(w, req)
|
||||
}()
|
||||
|
||||
if p, ok := <-panicCh; ok {
|
||||
panic(p)
|
||||
}
|
||||
|
||||
ctx = req.Context()
|
||||
if t := TaskFrom(ctx); t != nil && t.Func != nil {
|
||||
t.Func()
|
||||
}
|
||||
|
||||
})
|
||||
}
|
@ -0,0 +1,79 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package filters
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func TestPropogatingPanic(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
klog.SetOutput(&buf)
|
||||
klog.LogToStderr(false)
|
||||
defer klog.LogToStderr(true)
|
||||
|
||||
panicMsg := "panic as designed"
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
panic(panicMsg)
|
||||
})
|
||||
resolver := &request.RequestInfoFactory{
|
||||
APIPrefixes: sets.NewString("api", "apis"),
|
||||
GrouplessAPIPrefixes: sets.NewString("api"),
|
||||
}
|
||||
ts := httptest.NewServer(WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
|
||||
defer ts.Close()
|
||||
_, err := http.Get(ts.URL)
|
||||
if err == nil {
|
||||
t.Error("expected to receive an error")
|
||||
}
|
||||
|
||||
klog.Flush()
|
||||
klog.SetOutput(&bytes.Buffer{}) // prevent further writes into buf
|
||||
capturedOutput := buf.String()
|
||||
|
||||
if !strings.Contains(capturedOutput, panicMsg) || !strings.Contains(capturedOutput, "apiserver panic'd") {
|
||||
t.Errorf("unexpected out captured actual = %v", capturedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecutionWithRoutine(t *testing.T) {
|
||||
var executed bool
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
t := TaskFrom(r.Context())
|
||||
t.Func = func() {
|
||||
executed = true
|
||||
}
|
||||
})
|
||||
ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
|
||||
defer ts.Close()
|
||||
|
||||
_, err := http.Get(ts.URL)
|
||||
if err != nil {
|
||||
t.Errorf("got unexpected error on request: %v", err)
|
||||
}
|
||||
if !executed {
|
||||
t.Error("expected to execute")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user