mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-17 15:50:10 +00:00
genericapiserver/api/filters: cut off pkg/api dependency and fix timeout status
This commit is contained in:
parent
8800015e28
commit
128594d17c
@ -20,10 +20,10 @@ go_library(
|
|||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api:go_default_library",
|
|
||||||
"//pkg/util:go_default_library",
|
"//pkg/util:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||||
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
|
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
|
||||||
@ -43,6 +43,7 @@ go_test(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/genericapiserver/endpoints/filters:go_default_library",
|
"//pkg/genericapiserver/endpoints/filters:go_default_library",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||||
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
|
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
|
||||||
],
|
],
|
||||||
|
@ -25,9 +25,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const globalTimeout = time.Minute
|
const globalTimeout = time.Minute
|
||||||
@ -39,22 +39,24 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
|
|||||||
if longRunning == nil {
|
if longRunning == nil {
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
timeoutFunc := func(req *http.Request) (<-chan time.Time, string) {
|
timeoutFunc := func(req *http.Request) (<-chan time.Time, *apierrors.StatusError) {
|
||||||
// TODO unify this with apiserver.MaxInFlightLimit
|
// TODO unify this with apiserver.MaxInFlightLimit
|
||||||
ctx, ok := requestContextMapper.Get(req)
|
ctx, ok := requestContextMapper.Get(req)
|
||||||
if !ok {
|
if !ok {
|
||||||
return time.After(globalTimeout), ""
|
// if this happens, the handler chain isn't setup correctly because there is no context mapper
|
||||||
|
return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout"))
|
||||||
}
|
}
|
||||||
|
|
||||||
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
return time.After(globalTimeout), ""
|
// if this happens, the handler chain isn't setup correctly because there is no request info
|
||||||
|
return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
|
||||||
}
|
}
|
||||||
|
|
||||||
if longRunning(req, requestInfo) {
|
if longRunning(req, requestInfo) {
|
||||||
return nil, ""
|
return nil, nil
|
||||||
}
|
}
|
||||||
return time.After(globalTimeout), ""
|
return time.After(globalTimeout), apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0)
|
||||||
}
|
}
|
||||||
return WithTimeout(handler, timeoutFunc)
|
return WithTimeout(handler, timeoutFunc)
|
||||||
}
|
}
|
||||||
@ -67,17 +69,17 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
|
|||||||
// the handler times out, writes by h to its http.ResponseWriter will return
|
// the handler times out, writes by h to its http.ResponseWriter will return
|
||||||
// http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no
|
// http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no
|
||||||
// timeout will be enforced.
|
// timeout will be enforced.
|
||||||
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, msg string)) http.Handler {
|
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, err *apierrors.StatusError)) http.Handler {
|
||||||
return &timeoutHandler{h, timeoutFunc}
|
return &timeoutHandler{h, timeoutFunc}
|
||||||
}
|
}
|
||||||
|
|
||||||
type timeoutHandler struct {
|
type timeoutHandler struct {
|
||||||
handler http.Handler
|
handler http.Handler
|
||||||
timeout func(*http.Request) (<-chan time.Time, string)
|
timeout func(*http.Request) (<-chan time.Time, *apierrors.StatusError)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
after, msg := t.timeout(r)
|
after, err := t.timeout(r)
|
||||||
if after == nil {
|
if after == nil {
|
||||||
t.handler.ServeHTTP(w, r)
|
t.handler.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
@ -93,13 +95,13 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
case <-done:
|
case <-done:
|
||||||
return
|
return
|
||||||
case <-after:
|
case <-after:
|
||||||
tw.timeout(msg)
|
tw.timeout(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type timeoutWriter interface {
|
type timeoutWriter interface {
|
||||||
http.ResponseWriter
|
http.ResponseWriter
|
||||||
timeout(string)
|
timeout(*apierrors.StatusError)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTimeoutWriter(w http.ResponseWriter) timeoutWriter {
|
func newTimeoutWriter(w http.ResponseWriter) timeoutWriter {
|
||||||
@ -183,7 +185,7 @@ func (tw *baseTimeoutWriter) WriteHeader(code int) {
|
|||||||
tw.w.WriteHeader(code)
|
tw.w.WriteHeader(code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tw *baseTimeoutWriter) timeout(msg string) {
|
func (tw *baseTimeoutWriter) timeout(err *apierrors.StatusError) {
|
||||||
tw.mu.Lock()
|
tw.mu.Lock()
|
||||||
defer tw.mu.Unlock()
|
defer tw.mu.Unlock()
|
||||||
|
|
||||||
@ -194,12 +196,8 @@ func (tw *baseTimeoutWriter) timeout(msg string) {
|
|||||||
// handler
|
// handler
|
||||||
if !tw.wroteHeader && !tw.hijacked {
|
if !tw.wroteHeader && !tw.hijacked {
|
||||||
tw.w.WriteHeader(http.StatusGatewayTimeout)
|
tw.w.WriteHeader(http.StatusGatewayTimeout)
|
||||||
if msg != "" {
|
enc := json.NewEncoder(tw.w)
|
||||||
tw.w.Write([]byte(msg))
|
enc.Encode(err)
|
||||||
} else {
|
|
||||||
enc := json.NewEncoder(tw.w)
|
|
||||||
enc.Encode(errors.NewServerTimeout(api.Resource(""), "", 0))
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// The timeout writer has been used by the inner handler. There is
|
// The timeout writer has been used by the inner handler. There is
|
||||||
// no way to timeout the HTTP request at the point. We have to shutdown
|
// no way to timeout the HTTP request at the point. We have to shutdown
|
||||||
|
@ -22,6 +22,10 @@ import (
|
|||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTimeout(t *testing.T) {
|
func TestTimeout(t *testing.T) {
|
||||||
@ -29,7 +33,7 @@ func TestTimeout(t *testing.T) {
|
|||||||
writeErrors := make(chan error, 1)
|
writeErrors := make(chan error, 1)
|
||||||
timeout := make(chan time.Time, 1)
|
timeout := make(chan time.Time, 1)
|
||||||
resp := "test response"
|
resp := "test response"
|
||||||
timeoutResp := "test timeout"
|
timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0)
|
||||||
|
|
||||||
ts := httptest.NewServer(WithTimeout(http.HandlerFunc(
|
ts := httptest.NewServer(WithTimeout(http.HandlerFunc(
|
||||||
func(w http.ResponseWriter, r *http.Request) {
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -37,8 +41,8 @@ func TestTimeout(t *testing.T) {
|
|||||||
_, err := w.Write([]byte(resp))
|
_, err := w.Write([]byte(resp))
|
||||||
writeErrors <- err
|
writeErrors <- err
|
||||||
}),
|
}),
|
||||||
func(*http.Request) (<-chan time.Time, string) {
|
func(*http.Request) (<-chan time.Time, *apierrors.StatusError) {
|
||||||
return timeout, timeoutResp
|
return timeout, timeoutErr
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
@ -69,8 +73,8 @@ func TestTimeout(t *testing.T) {
|
|||||||
t.Errorf("got res.StatusCode %d; expected %d", res.StatusCode, http.StatusServiceUnavailable)
|
t.Errorf("got res.StatusCode %d; expected %d", res.StatusCode, http.StatusServiceUnavailable)
|
||||||
}
|
}
|
||||||
body, _ = ioutil.ReadAll(res.Body)
|
body, _ = ioutil.ReadAll(res.Body)
|
||||||
if string(body) != timeoutResp {
|
if !strings.Contains(string(body), timeoutErr.Error()) {
|
||||||
t.Errorf("got body %q; expected %q", string(body), timeoutResp)
|
t.Errorf("got body %q; expected it to contain %q", string(body), timeoutErr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now try to send a response
|
// Now try to send a response
|
||||||
|
Loading…
Reference in New Issue
Block a user