diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go index d928deed09c..3cc4a728a57 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go @@ -41,7 +41,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" - genericfilters "k8s.io/apiserver/pkg/server/filters" + "k8s.io/apiserver/pkg/server/routine" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/tracing" "k8s.io/klog/v2" @@ -285,7 +285,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc } // Run watch serving in a separate goroutine to allow freeing current stack memory - t := genericfilters.TaskFrom(req.Context()) + t := routine.TaskFrom(req.Context()) if t != nil { t.Func = serve } else { diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 6f0ca1bcac4..62f077f24b3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -64,6 +64,7 @@ import ( genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" + "k8s.io/apiserver/pkg/server/routine" serverstore "k8s.io/apiserver/pkg/server/storage" storagevalue "k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storageversion" @@ -1055,7 +1056,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { // handler in current goroutine to minimize the stack memory usage. It must be // after WithPanicRecover() to be protected from panics. if c.FeatureGate.Enabled(genericfeatures.APIServingWithRoutine) { - handler = genericfilters.WithRoutine(handler, c.LongRunningFunc) + handler = routine.WithRoutine(handler, c.LongRunningFunc) } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) handler = genericapifilters.WithRequestReceivedTimestamp(handler) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/routine_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/wrap_test.go similarity index 69% rename from staging/src/k8s.io/apiserver/pkg/server/filters/routine_test.go rename to staging/src/k8s.io/apiserver/pkg/server/filters/wrap_test.go index fbfce608069..561a18a5581 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/routine_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/wrap_test.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Kubernetes Authors. +Copyright 2024 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/server/routine" "k8s.io/klog/v2" ) @@ -42,7 +43,7 @@ func TestPropogatingPanic(t *testing.T) { APIPrefixes: sets.NewString("api", "apis"), GrouplessAPIPrefixes: sets.NewString("api"), } - ts := httptest.NewServer(WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true })) + ts := httptest.NewServer(routine.WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true })) defer ts.Close() _, err := http.Get(ts.URL) if err == nil { @@ -57,23 +58,3 @@ func TestPropogatingPanic(t *testing.T) { t.Errorf("unexpected out captured actual = %v", capturedOutput) } } - -func TestExecutionWithRoutine(t *testing.T) { - var executed bool - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - t := TaskFrom(r.Context()) - t.Func = func() { - executed = true - } - }) - ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true })) - defer ts.Close() - - _, err := http.Get(ts.URL) - if err != nil { - t.Errorf("got unexpected error on request: %v", err) - } - if !executed { - t.Error("expected to execute") - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/httplog/httplog.go b/staging/src/k8s.io/apiserver/pkg/server/httplog/httplog.go index a4b4c5899fd..c64f5771d8b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/httplog/httplog.go +++ b/staging/src/k8s.io/apiserver/pkg/server/httplog/httplog.go @@ -31,6 +31,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/responsewriter" + "k8s.io/apiserver/pkg/server/routine" "k8s.io/klog/v2" ) @@ -125,10 +126,26 @@ func withLogging(handler http.Handler, stackTracePred StacktracePred, shouldLogR rl := newLoggedWithStartTime(req, w, startTime) rl.StacktraceWhen(stackTracePred) req = req.WithContext(context.WithValue(ctx, respLoggerContextKey, rl)) - defer rl.Log() + + var logFunc func() + logFunc = rl.Log + defer func() { + if logFunc != nil { + logFunc() + } + }() w = responsewriter.WrapForHTTP1Or2(rl) handler.ServeHTTP(w, req) + + // We need to ensure that the request is logged after it is processed. + // In case the request is executed in a separate goroutine created via + // WithRoutine handler in the handler chain (i.e. above handler.ServeHTTP() + // would return request is completely responsed), we want the logging to + // happen in that goroutine too, so we append it to the task. + if routine.AppendTask(ctx, &routine.Task{Func: rl.Log}) { + logFunc = nil + } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/routine.go b/staging/src/k8s.io/apiserver/pkg/server/routine/routine.go similarity index 83% rename from staging/src/k8s.io/apiserver/pkg/server/filters/routine.go rename to staging/src/k8s.io/apiserver/pkg/server/routine/routine.go index 3f4dfa2bb22..c4b23c59034 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/routine.go +++ b/staging/src/k8s.io/apiserver/pkg/server/routine/routine.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package filters +package routine import ( "context" @@ -35,6 +35,20 @@ func WithTask(parent context.Context, t *Task) context.Context { return request.WithValue(parent, taskKey, t) } +// AppendTask appends a task executed after completion of existing task. +// It is a no-op if there is no existing task. +func AppendTask(ctx context.Context, t *Task) bool { + if existTask := TaskFrom(ctx); existTask != nil && existTask.Func != nil { + existFunc := existTask.Func + existTask.Func = func() { + existFunc() + t.Func() + } + return true + } + return false +} + func TaskFrom(ctx context.Context) *Task { t, _ := ctx.Value(taskKey).(*Task) return t diff --git a/staging/src/k8s.io/apiserver/pkg/server/routine/routine_test.go b/staging/src/k8s.io/apiserver/pkg/server/routine/routine_test.go new file mode 100644 index 00000000000..1d4249b5520 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/routine/routine_test.go @@ -0,0 +1,99 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package routine + +import ( + "net/http" + "net/http/httptest" + "testing" + + "k8s.io/apiserver/pkg/endpoints/request" +) + +func TestExecutionWithRoutine(t *testing.T) { + var executed bool + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t := TaskFrom(r.Context()) + t.Func = func() { + executed = true + } + }) + ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true })) + defer ts.Close() + + _, err := http.Get(ts.URL) + if err != nil { + t.Errorf("got unexpected error on request: %v", err) + } + if !executed { + t.Error("expected to execute") + } +} + +func TestAppendTask(t *testing.T) { + tests := []struct { + name string + existingTask bool + taskAppended bool + shouldExecute bool + }{ + { + name: "append task when existing", + existingTask: true, + taskAppended: true, + shouldExecute: true, + }, + { + name: "not append task when no existing tasks", + existingTask: false, + taskAppended: false, + shouldExecute: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var executed, appended bool + taskToAppend := func() { + executed = true + } + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + if test.existingTask { + t := TaskFrom(ctx) + t.Func = func() {} + } + + appended = AppendTask(ctx, &Task{taskToAppend}) + }) + ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true })) + defer ts.Close() + + _, err := http.Get(ts.URL) + if err != nil { + t.Errorf("got unexpected error on request: %v", err) + } + + if test.taskAppended != appended { + t.Errorf("expected taskAppended: %t, got: %t", test.taskAppended, executed) + } + + if test.shouldExecute != executed { + t.Errorf("expected shouldExecute: %t, got: %t", test.shouldExecute, executed) + } + }) + } +}