Revert "plumb context with request deadline"

This reverts commit 83f869ee13.
This commit is contained in:
Jordan Liggitt 2020-11-19 18:15:04 -05:00
parent 3e21057070
commit afd92b3b3e
16 changed files with 67 additions and 389 deletions

View File

@ -39,14 +39,7 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
handler = genericapifilters.WithAuthentication(handler, server.InsecureSuperuser{}, nil, nil)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
// WithRequestDeadline sets a deadline for the request context appropriately
handler = genericapifilters.WithRequestDeadline(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithWarningRecorder(handler)

View File

@ -53,7 +53,6 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/endpoints/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -73,7 +73,6 @@ import (
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server/filters"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
@ -287,7 +286,6 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
// simplified long-running check
return requestInfo.Verb == "watch" || requestInfo.Verb == "proxy"
})
handler = genericapifilters.WithRequestDeadline(handler, testLongRunningCheck, 60*time.Second)
handler = genericapifilters.WithRequestInfo(handler, testRequestInfoResolver())
return &defaultAPIServer{handler, container}
@ -300,11 +298,6 @@ func testRequestInfoResolver() *request.RequestInfoFactory {
}
}
var testLongRunningCheck = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
func TestSimpleSetupRight(t *testing.T) {
s := &genericapitesting.Simple{ObjectMeta: metav1.ObjectMeta{Name: "aName"}}
wire, err := runtime.Encode(codec, s)

View File

@ -16,7 +16,6 @@ go_test(
"cachecontrol_test.go",
"impersonation_test.go",
"metrics_test.go",
"request_deadline_test.go",
"request_received_time_test.go",
"requestinfo_test.go",
"warning_test.go",
@ -57,7 +56,6 @@ go_library(
"doc.go",
"impersonation.go",
"metrics.go",
"request_deadline.go",
"request_received_time.go",
"requestinfo.go",
"storageversion.go",

View File

@ -1,105 +0,0 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"context"
"errors"
"fmt"
"k8s.io/klog/v2"
"net/http"
"time"
"k8s.io/apiserver/pkg/endpoints/request"
)
var (
// The 'timeout' query parameter in the request URL has an invalid timeout specifier
errInvalidTimeoutInURL = errors.New("invalid timeout specified in the request URL")
// The timeout specified in the request URL exceeds the global maximum timeout allowed by the apiserver.
errTimeoutExceedsMaximumAllowed = errors.New("timeout specified in the request URL exceeds the maximum timeout allowed by the server")
)
// WithRequestDeadline determines the deadline of the given request and sets a new context with the appropriate timeout.
// requestTimeoutMaximum specifies the default request timeout value
func WithRequestDeadline(handler http.Handler, longRunning request.LongRunningRequestCheck, requestTimeoutMaximum time.Duration) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
handleError(w, req, http.StatusInternalServerError, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
return
}
if longRunning(req, requestInfo) {
handler.ServeHTTP(w, req)
return
}
userSpecifiedTimeout, ok, err := parseTimeout(req)
if err != nil {
statusCode := http.StatusInternalServerError
if err == errInvalidTimeoutInURL {
statusCode = http.StatusBadRequest
}
handleError(w, req, statusCode, err)
return
}
timeout := requestTimeoutMaximum
if ok {
if userSpecifiedTimeout > requestTimeoutMaximum {
handleError(w, req, http.StatusBadRequest, errTimeoutExceedsMaximumAllowed)
return
}
timeout = userSpecifiedTimeout
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req = req.WithContext(ctx)
handler.ServeHTTP(w, req)
})
}
// parseTimeout parses the given HTTP request URL and extracts the timeout query parameter
// value if specified by the user.
// If a timeout is not specified the function returns false and err is set to nil
// If the value specified is malformed then the function returns false and err is set
func parseTimeout(req *http.Request) (time.Duration, bool, error) {
value := req.URL.Query().Get("timeout")
if value == "" {
return 0, false, nil
}
timeout, err := time.ParseDuration(value)
if err != nil {
return 0, false, errInvalidTimeoutInURL
}
return timeout, true, nil
}
func handleError(w http.ResponseWriter, r *http.Request, code int, err error) {
errorMsg := fmt.Sprintf("Error - %s: %#v", err.Error(), r.RequestURI)
http.Error(w, errorMsg, code)
klog.Errorf(err.Error())
}

View File

@ -1,192 +0,0 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"k8s.io/apiserver/pkg/endpoints/request"
)
func TestParseTimeout(t *testing.T) {
tests := []struct {
name string
url string
expected bool
timeoutExpected time.Duration
errExpected error
}{
{
name: "the user does not specify a timeout",
url: "/api/v1/namespaces",
},
{
name: "the user specifies a valid timeout",
url: "/api/v1/namespaces?timeout=10s",
expected: true,
timeoutExpected: 10 * time.Second,
},
{
name: "the use specifies an invalid timeout",
url: "/api/v1/namespaces?timeout=foo",
errExpected: errInvalidTimeoutInURL,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
request, err := http.NewRequest(http.MethodGet, test.url, nil)
if err != nil {
t.Fatalf("failed to create new http request - %v", err)
}
timeoutGot, ok, err := parseTimeout(request)
if test.expected != ok {
t.Errorf("expected: %t, but got: %t", test.expected, ok)
}
if test.errExpected != err {
t.Errorf("expected err: %v, but got: %v", test.errExpected, err)
}
if test.timeoutExpected != timeoutGot {
t.Errorf("expected timeout: %s, but got: %s", test.timeoutExpected, timeoutGot)
}
})
}
}
func TestWithRequestDeadline(t *testing.T) {
const requestTimeoutMaximum = 60 * time.Second
tests := []struct {
name string
requestURL string
longRunning bool
hasDeadlineExpected bool
deadlineExpected time.Duration
handlerCallCountExpected int
statusCodeExpected int
}{
{
name: "the user specifies a valid request timeout",
requestURL: "/api/v1/namespaces?timeout=15s",
longRunning: false,
handlerCallCountExpected: 1,
hasDeadlineExpected: true,
deadlineExpected: 14 * time.Second, // to account for the delay in verification
statusCodeExpected: http.StatusOK,
},
{
name: "the user does not specify any request timeout, default deadline is expected to be set",
requestURL: "/api/v1/namespaces?timeout=",
longRunning: false,
handlerCallCountExpected: 1,
hasDeadlineExpected: true,
deadlineExpected: requestTimeoutMaximum - time.Second, // to account for the delay in verification
statusCodeExpected: http.StatusOK,
},
{
name: "the request is long running, no deadline is expected to be set",
requestURL: "/api/v1/namespaces?timeout=10s",
longRunning: true,
hasDeadlineExpected: false,
handlerCallCountExpected: 1,
statusCodeExpected: http.StatusOK,
},
{
name: "the timeout specified is malformed, the request is aborted with HTTP 400",
requestURL: "/api/v1/namespaces?timeout=foo",
longRunning: false,
statusCodeExpected: http.StatusBadRequest,
},
{
name: "the timeout specified exceeds the maximum deadline allowed, the request is aborted with HTTP 400",
requestURL: fmt.Sprintf("/api/v1/namespaces?timeout=%s", requestTimeoutMaximum+time.Second),
longRunning: false,
statusCodeExpected: http.StatusBadRequest,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
callCount int
hasDeadlineGot bool
deadlineGot time.Duration
)
handler := http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {
callCount++
deadlineGot, hasDeadlineGot = deadline(req)
})
withDeadline := WithRequestDeadline(
handler, func(_ *http.Request, _ *request.RequestInfo) bool { return test.longRunning }, requestTimeoutMaximum)
withDeadline = WithRequestInfo(withDeadline, &fakeRequestResolver{})
testRequest, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
if err != nil {
t.Fatalf("failed to create new http request - %v", err)
}
// make sure a default request does not have any deadline set
remaning, ok := deadline(testRequest)
if ok {
t.Fatalf("test setup failed, expected the new HTTP request context to have no deadline but got: %s", remaning)
}
w := httptest.NewRecorder()
withDeadline.ServeHTTP(w, testRequest)
if test.handlerCallCountExpected != callCount {
t.Errorf("expected the request handler to be invoked %d times, but was actually invoked %d times", test.handlerCallCountExpected, callCount)
}
if test.hasDeadlineExpected != hasDeadlineGot {
t.Errorf("expected the request context to have deadline set: %t but got: %t", test.hasDeadlineExpected, hasDeadlineGot)
}
deadlineGot = deadlineGot.Truncate(time.Second)
if test.deadlineExpected != deadlineGot {
t.Errorf("expected a request context with a deadline of %s but got: %s", test.deadlineExpected, deadlineGot)
}
statusCodeGot := w.Result().StatusCode
if test.statusCodeExpected != statusCodeGot {
t.Errorf("expected status code %d but got: %d", test.statusCodeExpected, statusCodeGot)
}
})
}
}
type fakeRequestResolver struct{}
func (r fakeRequestResolver) NewRequestInfo(req *http.Request) (*request.RequestInfo, error) {
return &request.RequestInfo{}, nil
}
func deadline(r *http.Request) (time.Duration, bool) {
if deadline, ok := r.Context().Deadline(); ok {
remaining := time.Until(deadline)
return remaining, ok
}
return 0, false
}

View File

@ -57,6 +57,9 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
return
}
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.URL.Query().Get("timeout"))
namespace, name, err := scope.Namer.Name(req)
if err != nil {
if includeName {
@ -73,7 +76,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}
}
ctx, cancel := context.WithTimeout(req.Context(), requestTimeout)
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
@ -154,7 +157,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finishRequest(timeout, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj, err := scope.Creater.New(scope.Kind)
if err != nil {

View File

@ -54,12 +54,15 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
return
}
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.URL.Query().Get("timeout"))
namespace, name, err := scope.Namer.Name(req)
if err != nil {
scope.err(err, w, req)
return
}
ctx, cancel := context.WithTimeout(req.Context(), requestTimeout)
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
ae := request.AuditEventFrom(ctx)
@ -120,7 +123,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
wasDeleted := true
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finishRequest(timeout, func() (runtime.Object, error) {
obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options)
wasDeleted = deleted
return obj, err
@ -169,13 +172,16 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
return
}
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.URL.Query().Get("timeout"))
namespace, err := scope.Namer.Namespace(req)
if err != nil {
scope.err(err, w, req)
return
}
ctx, cancel := context.WithTimeout(req.Context(), requestTimeout)
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
ae := request.AuditEventFrom(ctx)
@ -259,7 +265,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
admit = admission.WithAudit(admit, ae)
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.DeleteCollection(ctx, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options, &listOptions)
})
if err != nil {

View File

@ -84,13 +84,18 @@ func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interfac
return
}
// TODO: we either want to remove timeout or document it (if we
// document, move timeout out of this function and declare it in
// api_installer)
timeout := parseTimeout(req.URL.Query().Get("timeout"))
namespace, name, err := scope.Namer.Name(req)
if err != nil {
scope.err(err, w, req)
return
}
ctx, cancel := context.WithTimeout(req.Context(), requestTimeout)
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
@ -203,6 +208,7 @@ func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interfac
codec: codec,
timeout: timeout,
options: options,
restPatcher: r,
@ -265,6 +271,7 @@ type patcher struct {
codec runtime.Codec
timeout time.Duration
options *metav1.PatchOptions
// Operation information
@ -584,7 +591,7 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
wasCreated = created
return updateObject, updateErr
}
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finishRequest(p.timeout, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.

View File

@ -53,9 +53,6 @@ import (
)
const (
// 34 chose as a number close to 30 that is likely to be unique enough to jump out at me the next time I see a timeout.
// Everyone chooses 30.
requestTimeout = 34 * time.Second
// DuplicateOwnerReferencesWarningFormat is the warning that a client receives when a create/update request contains
// duplicate owner reference entries.
DuplicateOwnerReferencesWarningFormat = ".metadata.ownerReferences contains duplicate entries; API server dedups owner references in 1.20+, and may reject such requests as early as 1.24; please fix your requests; duplicate UID(s) observed: %v"
@ -230,7 +227,7 @@ type resultFunc func() (runtime.Object, error)
// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response.
// An api.Status object with status != success is considered an "error", which interrupts the normal response flow.
func finishRequest(ctx context.Context, fn resultFunc) (result runtime.Object, err error) {
func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) {
// these channels need to be buffered to prevent the goroutine below from hanging indefinitely
// when the select statement reads something other than the one the goroutine sends on.
ch := make(chan runtime.Object, 1)
@ -274,8 +271,8 @@ func finishRequest(ctx context.Context, fn resultFunc) (result runtime.Object, e
return nil, err
case p := <-panicCh:
panic(p)
case <-ctx.Done():
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0)
case <-time.After(timeout):
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", timeout), 0)
}
}

View File

@ -456,6 +456,8 @@ func (tc *patchTestCase) Run(t *testing.T) {
codec: codec,
timeout: 1 * time.Second,
restPatcher: testPatcher,
name: name,
patchType: patchType,
@ -464,10 +466,7 @@ func (tc *patchTestCase) Run(t *testing.T) {
trace: utiltrace.New("Patch", utiltrace.Field{"name", name}),
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
resultObj, _, err := p.patchResource(ctx, &RequestScope{})
cancel()
if len(tc.expectedError) != 0 {
if err == nil || err.Error() != tc.expectedError {
t.Errorf("%s: expected error %v, but got %v", tc.name, tc.expectedError, err)
@ -843,13 +842,9 @@ func TestFinishRequest(t *testing.T) {
exampleErr := fmt.Errorf("error")
successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"}
errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"}
timeoutFunc := func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.TODO(), time.Second)
}
testcases := []struct {
name string
timeout func() (context.Context, context.CancelFunc)
timeout time.Duration
fn resultFunc
expectedObj runtime.Object
expectedErr error
@ -859,7 +854,7 @@ func TestFinishRequest(t *testing.T) {
}{
{
name: "Expected obj is returned",
timeout: timeoutFunc,
timeout: time.Second,
fn: func() (runtime.Object, error) {
return exampleObj, nil
},
@ -868,7 +863,7 @@ func TestFinishRequest(t *testing.T) {
},
{
name: "Expected error is returned",
timeout: timeoutFunc,
timeout: time.Second,
fn: func() (runtime.Object, error) {
return nil, exampleErr
},
@ -877,7 +872,7 @@ func TestFinishRequest(t *testing.T) {
},
{
name: "Successful status object is returned as expected",
timeout: timeoutFunc,
timeout: time.Second,
fn: func() (runtime.Object, error) {
return successStatusObj, nil
},
@ -886,7 +881,7 @@ func TestFinishRequest(t *testing.T) {
},
{
name: "Error status object is converted to StatusError",
timeout: timeoutFunc,
timeout: time.Second,
fn: func() (runtime.Object, error) {
return errorStatusObj, nil
},
@ -895,7 +890,7 @@ func TestFinishRequest(t *testing.T) {
},
{
name: "Panic is propagated up",
timeout: timeoutFunc,
timeout: time.Second,
fn: func() (runtime.Object, error) {
panic("my panic")
},
@ -905,7 +900,7 @@ func TestFinishRequest(t *testing.T) {
},
{
name: "Panic is propagated with stack",
timeout: timeoutFunc,
timeout: time.Second,
fn: func() (runtime.Object, error) {
panic("my panic")
},
@ -915,7 +910,7 @@ func TestFinishRequest(t *testing.T) {
},
{
name: "http.ErrAbortHandler panic is propagated without wrapping with stack",
timeout: timeoutFunc,
timeout: time.Second,
fn: func() (runtime.Object, error) {
panic(http.ErrAbortHandler)
},
@ -927,10 +922,7 @@ func TestFinishRequest(t *testing.T) {
}
for i, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := tc.timeout()
defer func() {
cancel()
r := recover()
switch {
case r == nil && len(tc.expectedPanic) > 0:
@ -945,7 +937,7 @@ func TestFinishRequest(t *testing.T) {
t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r)
}
}()
obj, err := finishRequest(ctx, tc.fn)
obj, err := finishRequest(tc.timeout, tc.fn)
if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) {
t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err)
}

View File

@ -54,12 +54,15 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
return
}
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.URL.Query().Get("timeout"))
namespace, name, err := scope.Namer.Name(req)
if err != nil {
scope.err(err, w, req)
return
}
ctx, cancel := context.WithTimeout(req.Context(), requestTimeout)
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
@ -192,7 +195,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finishRequest(timeout, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.

View File

@ -746,14 +746,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = filterlatency.TrackStarted(handler, "authentication")
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
// WithRequestDeadline sets a deadline for the request context appropriately
handler = genericapifilters.WithRequestDeadline(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {

View File

@ -18,12 +18,14 @@ package filters
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"runtime"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -32,33 +34,37 @@ import (
)
// WithTimeoutForNonLongRunningRequests times out non-long-running requests after the time given by timeout.
func WithTimeoutForNonLongRunningRequests(handler http.Handler, longRunning apirequest.LongRunningRequestCheck) http.Handler {
func WithTimeoutForNonLongRunningRequests(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, timeout time.Duration) http.Handler {
if longRunning == nil {
return handler
}
timeoutFunc := func(req *http.Request) (*http.Request, bool, func(), *apierrors.StatusError) {
timeoutFunc := func(req *http.Request) (*http.Request, <-chan time.Time, func(), *apierrors.StatusError) {
// TODO unify this with apiserver.MaxInFlightLimit
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no request info
return req, false, func() {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
return req, time.After(timeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
}
if longRunning(req, requestInfo) {
return req, true, nil, nil
return req, nil, nil, nil
}
ctx, cancel := context.WithCancel(ctx)
req = req.WithContext(ctx)
postTimeoutFn := func() {
cancel()
metrics.RecordRequestTermination(req, requestInfo, metrics.APIServerComponent, http.StatusGatewayTimeout)
}
return req, false, postTimeoutFn, apierrors.NewTimeoutError("request did not complete within the allotted timeout", 0)
return req, time.After(timeout), postTimeoutFn, apierrors.NewTimeoutError(fmt.Sprintf("request did not complete within %s", timeout), 0)
}
return WithTimeout(handler, timeoutFunc)
}
type timeoutFunc = func(*http.Request) (req *http.Request, longRunning bool, postTimeoutFunc func(), err *apierrors.StatusError)
type timeoutFunc = func(*http.Request) (req *http.Request, timeout <-chan time.Time, postTimeoutFunc func(), err *apierrors.StatusError)
// WithTimeout returns an http.Handler that runs h with a timeout
// determined by timeoutFunc. The new http.Handler calls h.ServeHTTP to handle
@ -79,14 +85,12 @@ type timeoutHandler struct {
}
func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r, longRunning, postTimeoutFn, err := t.timeout(r)
if longRunning {
r, after, postTimeoutFn, err := t.timeout(r)
if after == nil {
t.handler.ServeHTTP(w, r)
return
}
timeout := r.Context().Done()
// resultCh is used as both errCh and stopCh
resultCh := make(chan interface{})
tw := newTimeoutWriter(w)
@ -113,7 +117,7 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
panic(err)
}
return
case <-timeout:
case <-after:
defer func() {
// resultCh needs to have a reader, since the function doing
// the work needs to send to it. This is defer'd to ensure it runs

View File

@ -18,7 +18,6 @@ package filters
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@ -93,27 +92,18 @@ func TestTimeout(t *testing.T) {
timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0)
record := &recorder{}
var ctx context.Context
withDeadline := func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(ctx)
handler.ServeHTTP(w, req)
})
}
handler := newHandler(sendResponse, doPanic, writeErrors)
ts := httptest.NewServer(withDeadline(withPanicRecovery(
WithTimeout(handler, func(req *http.Request) (*http.Request, bool, func(), *apierrors.StatusError) {
return req, false, record.Record, timeoutErr
ts := httptest.NewServer(withPanicRecovery(
WithTimeout(handler, func(req *http.Request) (*http.Request, <-chan time.Time, func(), *apierrors.StatusError) {
return req, timeout, record.Record, timeoutErr
}), func(w http.ResponseWriter, req *http.Request, err interface{}) {
gotPanic <- err
http.Error(w, "This request caused apiserver to panic. Look in the logs for details.", http.StatusInternalServerError)
}),
))
)
defer ts.Close()
// No timeouts
ctx = context.Background()
sendResponse <- resp
res, err := http.Get(ts.URL)
if err != nil {
@ -134,8 +124,6 @@ func TestTimeout(t *testing.T) {
}
// Times out
ctx, cancel := context.WithCancel(context.Background())
cancel()
timeout <- time.Time{}
res, err = http.Get(ts.URL)
if err != nil {
@ -157,7 +145,6 @@ func TestTimeout(t *testing.T) {
}
// Now try to send a response
ctx = context.Background()
sendResponse <- resp
if err := <-writeErrors; err != http.ErrHandlerTimeout {
t.Errorf("got Write error of %v; expected %v", err, http.ErrHandlerTimeout)
@ -183,7 +170,6 @@ func TestTimeout(t *testing.T) {
}
// Panics with http.ErrAbortHandler
ctx = context.Background()
doPanic <- http.ErrAbortHandler
res, err = http.Get(ts.URL)
if err != nil {

View File

@ -124,7 +124,7 @@ func testWebhookTimeout(t *testing.T, watchCache bool) {
},
{
name: "timed out client requests skip later mutating webhooks (regardless of failure policy) and fail",
timeoutSeconds: 4,
timeoutSeconds: 3,
mutatingWebhooks: []testWebhook{
{path: "/mutating/1/5s", policy: admissionv1beta1.Ignore, timeoutSeconds: 4},
{path: "/mutating/2/1s", policy: admissionv1beta1.Ignore, timeoutSeconds: 5},
@ -133,7 +133,8 @@ func testWebhookTimeout(t *testing.T, watchCache bool) {
expectInvocations: []invocation{
{path: "/mutating/1/5s", timeoutSeconds: 3}, // from request
},
expectError: true,
expectError: true,
errorContains: "request did not complete within requested timeout",
},
}