apiserver: add API server plumbing for adding warnings

This commit is contained in:
Jordan Liggitt 2020-04-14 16:10:54 -04:00
parent b1098bd0d5
commit e5e557e902
9 changed files with 312 additions and 0 deletions

View File

@ -679,6 +679,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
genericfeatures.DryRun: {Default: true, PreRelease: featuregate.GA},
genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
genericfeatures.WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
// features that enable backwards compatibility but are scheduled to be removed
// ...

View File

@ -41,6 +41,7 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, server.NewRequestInfoResolver(c))
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler)

View File

@ -0,0 +1,69 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"net/http"
"sync"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/warning"
)
// WithWarningRecorder attaches a deduplicating k8s.io/apiserver/pkg/warning#WarningRecorder to the request context.
func WithWarningRecorder(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
recorder := &recorder{writer: w}
req = req.WithContext(warning.WithWarningRecorder(req.Context(), recorder))
handler.ServeHTTP(w, req)
})
}
type recorder struct {
lock sync.Mutex
recorded map[string]bool
writer http.ResponseWriter
}
func (r *recorder) AddWarning(agent, text string) {
if len(text) == 0 {
return
}
r.lock.Lock()
defer r.lock.Unlock()
// init if needed
if r.recorded == nil {
r.recorded = map[string]bool{}
}
// dedupe if already warned
if r.recorded[text] {
return
}
r.recorded[text] = true
// TODO(liggitt): track total message characters written:
// * if this takes us over 4k truncate individual messages to 256 chars and regenerate headers
// * if we're already truncating truncate this message to 256 chars
// * if we're still over 4k omit this message
if header, err := net.NewWarningHeader(299, agent, text); err == nil {
r.writer.Header().Add("Warning", header)
}
}

View File

@ -0,0 +1,100 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"net/http/httptest"
"reflect"
"testing"
)
func Test_recorder_AddWarning(t *testing.T) {
type args struct {
agent string
text string
}
tests := []struct {
name string
args []args
expect []string
}{
{
name: "empty",
args: []args{},
expect: nil,
},
{
name: "empty values",
args: []args{{agent: "", text: ""}},
expect: nil,
},
{
name: "empty agent",
args: []args{{agent: "", text: "mytext"}},
expect: []string{`299 - "mytext"`},
},
{
name: "simple",
args: []args{{agent: "myagent", text: "mytext"}},
expect: []string{`299 myagent "mytext"`},
},
{
name: "duplicate text",
args: []args{
{agent: "myagent", text: "mytext"},
{agent: "myagent2", text: "mytext"},
},
expect: []string{`299 myagent "mytext"`},
},
{
name: "multiple",
args: []args{
{agent: "", text: "mytext1"},
{agent: "", text: "mytext2"},
{agent: "", text: "mytext3"},
},
expect: []string{
`299 - "mytext1"`,
`299 - "mytext2"`,
`299 - "mytext3"`,
},
},
{
name: "invalid agent",
args: []args{{agent: "my agent", text: "mytext"}},
expect: nil,
},
{
name: "invalid text",
args: []args{{agent: "myagent", text: "my\ntext"}},
expect: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
responseRecorder := httptest.NewRecorder()
warningRecorder := &recorder{writer: responseRecorder}
for _, arg := range tt.args {
warningRecorder.AddWarning(arg.agent, arg.text)
}
if !reflect.DeepEqual(tt.expect, responseRecorder.Header()["Warning"]) {
t.Errorf("expected\n%#v\ngot\n%#v", tt.expect, responseRecorder.Header()["Warning"])
}
})
}
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
utilwarning "k8s.io/apiserver/pkg/endpoints/warning"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -622,6 +623,10 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
verbOverrider, needOverride := storage.(StorageMetricsOverride)
// accumulate endpoint-level warnings
enableWarningHeaders := utilfeature.DefaultFeatureGate.Enabled(features.WarningHeaders)
warnings := []string{}
switch action.Verb {
case "GET": // Get a resource.
var handler restful.RouteFunction
@ -637,6 +642,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
} else {
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
}
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
doc := "read the specified " + kind
if isSubresource {
@ -667,6 +675,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "list " + subresource + " of objects of kind " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -699,6 +710,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "replace " + subresource + " of the specified " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulUpdateResource(updater, reqScope, admit))
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
route := ws.PUT(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -729,6 +743,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
supportedTypes = append(supportedTypes, string(types.ApplyPatchType))
}
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulPatchResource(patcher, reqScope, admit, supportedTypes))
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
route := ws.PATCH(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -751,6 +768,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
@ -784,6 +804,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
deleteReturnType = producedObject
}
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
route := ws.DELETE(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -807,6 +830,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "delete collection of " + subresource + " of a " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit))
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
route := ws.DELETE(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -834,6 +860,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
doc += ". deprecated: use the 'watch' parameter with a list operation instead, filtered to a single item with the 'fieldSelector' parameter."
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -854,6 +883,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
doc += ". deprecated: use the 'watch' parameter with a list operation instead."
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -877,6 +909,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "connect " + method + " requests to " + subresource + " of " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulConnectResource(connecter, reqScope, admit, path, isSubresource))
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
route := ws.Method(method).Path(action.Path).
To(handler).
Doc(doc).

View File

@ -0,0 +1,39 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package warning
import (
restful "github.com/emicklei/go-restful"
"k8s.io/apiserver/pkg/warning"
)
// AddWarningsHandler returns a handler that adds the provided warnings to all requests,
// then delegates to the provided handler.
func AddWarningsHandler(handler restful.RouteFunction, warnings []string) restful.RouteFunction {
if len(warnings) == 0 {
return handler
}
return func(req *restful.Request, res *restful.Response) {
ctx := req.Request.Context()
for _, msg := range warnings {
warning.AddWarning(ctx, "", msg)
}
handler(req, res)
}
}

View File

@ -143,6 +143,12 @@ const (
//
// Allows label and field based indexes in apiserver watch cache to accelerate list operations.
SelectorIndex featuregate.Feature = "SelectorIndex"
// owner: @liggitt
// beta: v1.19
//
// Allows sending warning headers in API responses.
WarningHeaders featuregate.Feature = "WarningHeaders"
)
func init() {
@ -168,4 +174,5 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
RemoveSelfLink: {Default: false, PreRelease: featuregate.Alpha},
SelectorIndex: {Default: false, PreRelease: featuregate.Alpha},
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
}

View File

@ -678,6 +678,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler)
return handler

View File

@ -0,0 +1,59 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package warning
import (
"context"
)
// The key type is unexported to prevent collisions
type key int
const (
// auditAnnotationsKey is the context key for the audit annotations.
warningRecorderKey key = iota
)
// Recorder provides a method for recording warnings
type Recorder interface {
// AddWarning adds the specified warning to the response.
// agent must be valid UTF-8, and must not contain spaces, quotes, backslashes, or control characters.
// text must be valid UTF-8, and must not contain control characters.
AddWarning(agent, text string)
}
// WithWarningRecorder returns a new context that wraps the provided context and contains the provided Recorder implementation.
// The returned context can be passed to AddWarning().
func WithWarningRecorder(ctx context.Context, recorder Recorder) context.Context {
return context.WithValue(ctx, warningRecorderKey, recorder)
}
func warningRecorderFrom(ctx context.Context) (Recorder, bool) {
recorder, ok := ctx.Value(warningRecorderKey).(Recorder)
return recorder, ok
}
// AddWarning records a warning for the specified agent and text to the Recorder added to the provided context using WithWarningRecorder().
// If no Recorder exists in the provided context, this is a no-op.
// agent must be valid UTF-8, and must not contain spaces, quotes, backslashes, or control characters.
// text must be valid UTF-8, and must not contain control characters.
func AddWarning(ctx context.Context, agent string, text string) {
recorder, ok := warningRecorderFrom(ctx)
if !ok {
return
}
recorder.AddWarning(agent, text)
}