From a872c6826cd9fb6ce7384f8b6242e261e138abc2 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 4 May 2019 18:21:53 -0400 Subject: [PATCH 1/2] Set API compression feature gate to Beta --- pkg/features/kube_features.go | 2 +- staging/src/k8s.io/apiserver/pkg/features/kube_features.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 57974962e19..fbb2f28302e 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -529,7 +529,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS genericfeatures.ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.AdvancedAuditing: {Default: true, PreRelease: featuregate.GA}, genericfeatures.DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha}, - genericfeatures.APIResponseCompression: {Default: false, PreRelease: featuregate.Alpha}, + genericfeatures.APIResponseCompression: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.APIListChunking: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.DryRun: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.ServerSideApply: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 3cd4f7f8f5a..5a162b0ff7f 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -145,7 +145,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta}, AdvancedAuditing: {Default: true, PreRelease: featuregate.GA}, DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha}, - APIResponseCompression: {Default: false, PreRelease: featuregate.Alpha}, + APIResponseCompression: {Default: true, PreRelease: featuregate.Beta}, APIListChunking: {Default: true, PreRelease: featuregate.Beta}, DryRun: {Default: true, PreRelease: featuregate.Beta}, RemainingItemCount: {Default: false, PreRelease: featuregate.Alpha}, From 4ed2b9875d0498b5c577095075bda341e96fcec2 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 4 May 2019 17:36:36 -0400 Subject: [PATCH 2/2] Replace HTTP compression with an inline handler The previous HTTP compression implementation functioned as a filter, which required it to deal with a number of special cases that complicated the implementation. Instead, when we write an API object to a response, handle only that one case. This will allow a more limited implementation that does not impact other code flows. Also, to prevent excessive CPU use on small objects, compression is disabled on responses smaller than 128Kb in size. --- .../src/k8s.io/apiserver/pkg/endpoints/BUILD | 2 - .../apiserver/pkg/endpoints/apiserver_test.go | 54 +++- .../apiserver/pkg/endpoints/groupversion.go | 13 +- .../endpoints/handlers/responsewriters/BUILD | 7 + .../handlers/responsewriters/writers.go | 185 ++++++++--- .../handlers/responsewriters/writers_test.go | 303 ++++++++++++++++++ .../apiserver/pkg/endpoints/installer.go | 14 +- staging/src/k8s.io/apiserver/pkg/server/BUILD | 2 - .../src/k8s.io/apiserver/pkg/server/config.go | 14 +- .../k8s.io/apiserver/pkg/server/filters/BUILD | 3 - .../pkg/server/filters/compression.go | 181 ----------- .../pkg/server/filters/compression_test.go | 106 ------ .../apiserver/pkg/server/genericapiserver.go | 7 +- 13 files changed, 511 insertions(+), 380 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go delete mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/compression.go delete mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/compression_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD index 87eb9cc4de3..820223680b1 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD @@ -52,7 +52,6 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/endpoints/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", @@ -88,7 +87,6 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/github.com/emicklei/go-restful:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 91ee89abe8e..481d805ea8f 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -71,7 +71,6 @@ import ( genericapitesting "k8s.io/apiserver/pkg/endpoints/testing" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/apiserver/pkg/server/filters" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" ) @@ -1219,7 +1218,12 @@ func TestListCompression(t *testing.T) { } for i, testCase := range testCases { storage := map[string]rest.Storage{} - simpleStorage := SimpleRESTStorage{expectedResourceNamespace: testCase.namespace} + simpleStorage := SimpleRESTStorage{ + expectedResourceNamespace: testCase.namespace, + list: []genericapitesting.Simple{ + {Other: strings.Repeat("0123456789abcdef", (128*1024/16)+1)}, + }, + } storage["simple"] = &simpleStorage selfLinker := &setTestSelfLinker{ t: t, @@ -1228,7 +1232,6 @@ func TestListCompression(t *testing.T) { } var handler = handleInternal(storage, admissionControl, selfLinker, nil) - handler = filters.WithCompression(handler) handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver()) server := httptest.NewServer(handler) @@ -1656,13 +1659,53 @@ func BenchmarkGet(b *testing.B) { b.StopTimer() } -func TestGetCompression(t *testing.T) { +func BenchmarkGetNoCompression(b *testing.B) { storage := map[string]rest.Storage{} simpleStorage := SimpleRESTStorage{ item: genericapitesting.Simple{ Other: "foo", }, } + selfLinker := &setTestSelfLinker{ + expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id", + name: "id", + namespace: "default", + } + storage["simple"] = &simpleStorage + handler := handleLinker(storage, selfLinker) + server := httptest.NewServer(handler) + defer server.Close() + + client := &http.Client{ + Transport: &http.Transport{ + DisableCompression: true, + }, + } + + u := server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id" + + b.ResetTimer() + for i := 0; i < b.N; i++ { + resp, err := client.Get(u) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + b.Fatalf("unexpected response: %#v", resp) + } + if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil { + b.Fatalf("unable to read body") + } + } + b.StopTimer() +} +func TestGetCompression(t *testing.T) { + storage := map[string]rest.Storage{} + simpleStorage := SimpleRESTStorage{ + item: genericapitesting.Simple{ + Other: strings.Repeat("0123456789abcdef", (128*1024/16)+1), + }, + } selfLinker := &setTestSelfLinker{ t: t, expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id", @@ -1672,7 +1715,6 @@ func TestGetCompression(t *testing.T) { storage["simple"] = &simpleStorage handler := handleLinker(storage, selfLinker) - handler = filters.WithCompression(handler) handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver()) server := httptest.NewServer(handler) defer server.Close() @@ -1687,7 +1729,7 @@ func TestGetCompression(t *testing.T) { for _, test := range tests { req, err := http.NewRequest("GET", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/id", nil) if err != nil { - t.Fatalf("unexpected error cretaing request: %v", err) + t.Fatalf("unexpected error creating request: %v", err) } // It's necessary to manually set Accept-Encoding here // to prevent http.DefaultClient from automatically diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go b/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go index e624f0f91a9..920369977a9 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go @@ -20,7 +20,7 @@ import ( "path" "time" - "github.com/emicklei/go-restful" + restful "github.com/emicklei/go-restful" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -83,10 +83,6 @@ type APIGroupVersion struct { MinRequestTimeout time.Duration - // EnableAPIResponseCompression indicates whether API Responses should support compression - // if the client requests it via Accept-Encoding - EnableAPIResponseCompression bool - // OpenAPIModels exposes the OpenAPI models to each individual handler. OpenAPIModels openapiproto.Models @@ -101,10 +97,9 @@ type APIGroupVersion struct { func (g *APIGroupVersion) InstallREST(container *restful.Container) error { prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ - group: g, - prefix: prefix, - minRequestTimeout: g.MinRequestTimeout, - enableAPIResponseCompression: g.EnableAPIResponseCompression, + group: g, + prefix: prefix, + minRequestTimeout: g.MinRequestTimeout, } apiResources, ws, registrationErrors := installer.Install() diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/BUILD index 17033d5e8b0..90427ce85d8 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/BUILD @@ -11,6 +11,7 @@ go_test( srcs = [ "errors_test.go", "status_test.go", + "writers_test.go", ], embed = [":go_default_library"], deps = [ @@ -19,9 +20,13 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", ], ) @@ -46,8 +51,10 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flushwriter:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/wsstream:go_default_library", ], diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index 472258bb0d8..b7c59cfc54d 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -17,11 +17,17 @@ limitations under the License. package responsewriters import ( + "compress/gzip" "encoding/json" "fmt" "io" + "io/ioutil" "net/http" "strconv" + "strings" + "sync" + + "k8s.io/apiserver/pkg/features" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -31,30 +37,11 @@ import ( "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/flushwriter" "k8s.io/apiserver/pkg/util/wsstream" ) -// httpResponseWriterWithInit wraps http.ResponseWriter, and implements the io.Writer interface to be used -// with encoding. The purpose is to allow for encoding to a stream, while accommodating a custom HTTP status code -// if encoding fails, and meeting the encoder's io.Writer interface requirement. -type httpResponseWriterWithInit struct { - hasWritten bool - mediaType string - statusCode int - innerW http.ResponseWriter -} - -func (w httpResponseWriterWithInit) Write(b []byte) (n int, err error) { - if !w.hasWritten { - w.innerW.Header().Set("Content-Type", w.mediaType) - w.innerW.WriteHeader(w.statusCode) - w.hasWritten = true - } - - return w.innerW.Write(b) -} - // StreamObject performs input stream negotiation from a ResourceStreamer and writes that to the response. // If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many // browser clients cannot easily handle binary streaming protocols). @@ -96,15 +83,142 @@ func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSe } // SerializeObject renders an object in the content type negotiated by the client using the provided encoder. -// The context is optional and can be nil. -func SerializeObject(mediaType string, encoder runtime.Encoder, innerW http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { - w := httpResponseWriterWithInit{mediaType: mediaType, innerW: innerW, statusCode: statusCode} - - if err := encoder.Encode(object, w); err != nil { - errSerializationFatal(err, encoder, w) +// The context is optional and can be nil. This method will perform optional content compression if requested by +// a client and the feature gate for APIResponseCompression is enabled. +func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { + w := &deferredResponseWriter{ + mediaType: mediaType, + statusCode: statusCode, + contentEncoding: negotiateContentEncoding(req), + hw: hw, } + + err := encoder.Encode(object, w) + if err == nil { + err = w.Close() + if err == nil { + return + } + } + + // make a best effort to write the object if a failure is detected + utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err)) + status := ErrorToAPIStatus(err) + candidateStatusCode := int(status.Code) + // if the current status code is successful, allow the error's status code to overwrite it + if statusCode >= http.StatusOK && statusCode < http.StatusBadRequest { + w.statusCode = candidateStatusCode + } + output, err := runtime.Encode(encoder, status) + if err != nil { + w.mediaType = "text/plain" + output = []byte(fmt.Sprintf("%s: %s", status.Reason, status.Message)) + } + if _, err := w.Write(output); err != nil { + utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a fallback JSON response: %v", err)) + } + w.Close() } +var gzipPool = &sync.Pool{ + New: func() interface{} { + gw, err := gzip.NewWriterLevel(nil, defaultGzipContentEncodingLevel) + if err != nil { + panic(err) + } + return gw + }, +} + +const ( + // defaultGzipContentEncodingLevel is set to 4 which uses less CPU than the default level + defaultGzipContentEncodingLevel = 4 + // defaultGzipThresholdBytes is compared to the size of the first write from the stream + // (usually the entire object), and if the size is smaller no gzipping will be performed + // if the client requests it. + defaultGzipThresholdBytes = 128 * 1024 +) + +// negotiateContentEncoding returns a supported client-requested content encoding for the +// provided request. It will return the empty string if no supported content encoding was +// found or if response compression is disabled. +func negotiateContentEncoding(req *http.Request) string { + encoding := req.Header.Get("Accept-Encoding") + if len(encoding) == 0 { + return "" + } + if !utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression) { + return "" + } + for len(encoding) > 0 { + var token string + if next := strings.Index(encoding, ","); next != -1 { + token = encoding[:next] + encoding = encoding[next+1:] + } else { + token = encoding + encoding = "" + } + switch strings.TrimSpace(token) { + case "gzip": + return "gzip" + } + } + return "" +} + +type deferredResponseWriter struct { + mediaType string + statusCode int + contentEncoding string + + hasWritten bool + hw http.ResponseWriter + w io.Writer +} + +func (w *deferredResponseWriter) Write(p []byte) (n int, err error) { + if w.hasWritten { + return w.w.Write(p) + } + w.hasWritten = true + + hw := w.hw + header := hw.Header() + switch { + case w.contentEncoding == "gzip" && len(p) > defaultGzipThresholdBytes: + header.Set("Content-Encoding", "gzip") + header.Add("Vary", "Accept-Encoding") + + gw := gzipPool.Get().(*gzip.Writer) + gw.Reset(hw) + + w.w = gw + default: + w.w = hw + } + + header.Set("Content-Type", w.mediaType) + hw.WriteHeader(w.statusCode) + return w.w.Write(p) +} + +func (w *deferredResponseWriter) Close() error { + if !w.hasWritten { + return nil + } + var err error + switch t := w.w.(type) { + case *gzip.Writer: + err = t.Close() + t.Reset(nil) + gzipPool.Put(t) + } + return err +} + +var nopCloser = ioutil.NopCloser(nil) + // WriteObjectNegotiated renders an object in the content type negotiated by the client. func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { stream, ok := object.(rest.ResourceStreamer) @@ -157,25 +271,6 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV return code } -// errSerializationFatal renders an error to the response, and if codec fails will render plaintext. -// Returns the HTTP status code of the error. -func errSerializationFatal(err error, codec runtime.Encoder, w httpResponseWriterWithInit) { - utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err)) - status := ErrorToAPIStatus(err) - candidateStatusCode := int(status.Code) - // If original statusCode was not successful, we need to return the original error. - // We cannot hide it behind serialization problems - if w.statusCode >= http.StatusOK && w.statusCode < http.StatusBadRequest { - w.statusCode = candidateStatusCode - } - output, err := runtime.Encode(codec, status) - if err != nil { - w.mediaType = "text/plain" - output = []byte(fmt.Sprintf("%s: %s", status.Reason, status.Message)) - } - w.Write(output) -} - // WriteRawJSON writes a non-API object in JSON. func WriteRawJSON(statusCode int, object interface{}, w http.ResponseWriter) { output, err := json.MarshalIndent(object, "", " ") diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go new file mode 100644 index 00000000000..31bea58c0ed --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -0,0 +1,303 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package responsewriters + +import ( + "bytes" + "compress/gzip" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" +) + +func TestSerializeObject(t *testing.T) { + smallPayload := []byte("{test-object,test-object}") + largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1) + tests := []struct { + name string + + compressionEnabled bool + + mediaType string + out []byte + outErrs []error + req *http.Request + statusCode int + object runtime.Object + + wantCode int + wantHeaders http.Header + wantBody []byte + }{ + { + name: "serialize object", + out: smallPayload, + req: &http.Request{Header: http.Header{}}, + wantCode: http.StatusOK, + wantHeaders: http.Header{"Content-Type": []string{""}}, + wantBody: smallPayload, + }, + + { + name: "return content type", + out: smallPayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{}}, + wantCode: http.StatusOK, + wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, + wantBody: smallPayload, + }, + + { + name: "return status code", + statusCode: http.StatusBadRequest, + out: smallPayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{}}, + wantCode: http.StatusBadRequest, + wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, + wantBody: smallPayload, + }, + + { + name: "fail to encode object", + out: smallPayload, + outErrs: []error{fmt.Errorf("bad")}, + mediaType: "application/json", + req: &http.Request{Header: http.Header{}}, + wantCode: http.StatusInternalServerError, + wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, + wantBody: smallPayload, + }, + + { + name: "fail to encode object or status", + out: smallPayload, + outErrs: []error{fmt.Errorf("bad"), fmt.Errorf("bad2")}, + mediaType: "application/json", + req: &http.Request{Header: http.Header{}}, + wantCode: http.StatusInternalServerError, + wantHeaders: http.Header{"Content-Type": []string{"text/plain"}}, + wantBody: []byte(": bad"), + }, + + { + name: "fail to encode object or status with status code", + out: smallPayload, + outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, + mediaType: "application/json", + req: &http.Request{Header: http.Header{}}, + statusCode: http.StatusOK, + wantCode: http.StatusNotFound, + wantHeaders: http.Header{"Content-Type": []string{"text/plain"}}, + wantBody: []byte("NotFound: \"test\" not found"), + }, + + { + name: "fail to encode object or status with status code and keeps previous error", + out: smallPayload, + outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, + mediaType: "application/json", + req: &http.Request{Header: http.Header{}}, + statusCode: http.StatusNotAcceptable, + wantCode: http.StatusNotAcceptable, + wantHeaders: http.Header{"Content-Type": []string{"text/plain"}}, + wantBody: []byte("NotFound: \"test\" not found"), + }, + + { + name: "compression requires feature gate", + out: largePayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }}, + wantCode: http.StatusOK, + wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, + wantBody: largePayload, + }, + + { + name: "compress on gzip", + compressionEnabled: true, + out: largePayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }}, + wantCode: http.StatusOK, + wantHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel), + }, + + { + name: "compression is not performed on small objects", + compressionEnabled: true, + out: smallPayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }}, + wantCode: http.StatusOK, + wantHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + }, + wantBody: smallPayload, + }, + + { + name: "compress when multiple encodings are requested", + compressionEnabled: true, + out: largePayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{"deflate, , gzip,"}, + }}, + wantCode: http.StatusOK, + wantHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel), + }, + + { + name: "ignore compression on deflate", + compressionEnabled: true, + out: largePayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{"deflate"}, + }}, + wantCode: http.StatusOK, + wantHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + }, + wantBody: largePayload, + }, + + { + name: "ignore compression on unrecognized types", + compressionEnabled: true, + out: largePayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{", , other, nothing, what, "}, + }}, + wantCode: http.StatusOK, + wantHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + }, + wantBody: largePayload, + }, + + { + name: "errors are compressed", + compressionEnabled: true, + statusCode: http.StatusInternalServerError, + out: smallPayload, + outErrs: []error{fmt.Errorf(string(largePayload)), fmt.Errorf("bad2")}, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }}, + wantCode: http.StatusInternalServerError, + wantHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + wantBody: gzipContent([]byte(": "+string(largePayload)), defaultGzipContentEncodingLevel), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, tt.compressionEnabled)() + + encoder := &fakeEncoder{ + buf: tt.out, + errs: tt.outErrs, + } + if tt.statusCode == 0 { + tt.statusCode = http.StatusOK + } + recorder := httptest.NewRecorder() + SerializeObject(tt.mediaType, encoder, recorder, tt.req, tt.statusCode, tt.object) + result := recorder.Result() + if result.StatusCode != tt.wantCode { + t.Fatalf("unexpected code: %v", result.StatusCode) + } + if !reflect.DeepEqual(result.Header, tt.wantHeaders) { + t.Fatal(diff.ObjectReflectDiff(tt.wantHeaders, result.Header)) + } + body, _ := ioutil.ReadAll(result.Body) + if !bytes.Equal(tt.wantBody, body) { + t.Fatalf("wanted:\n%s\ngot:\n%s", hex.Dump(tt.wantBody), hex.Dump(body)) + } + }) + } +} + +type fakeEncoder struct { + obj runtime.Object + buf []byte + errs []error +} + +func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error { + e.obj = obj + if len(e.errs) > 0 { + err := e.errs[0] + e.errs = e.errs[1:] + return err + } + _, err := w.Write(e.buf) + return err +} + +func gzipContent(data []byte, level int) []byte { + buf := &bytes.Buffer{} + gw, err := gzip.NewWriterLevel(buf, level) + if err != nil { + panic(err) + } + if _, err := gw.Write(data); err != nil { + panic(err) + } + if err := gw.Close(); err != nil { + panic(err) + } + return buf.Bytes() +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go index d8260e88dc5..144fd44e2e1 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go @@ -40,7 +40,6 @@ import ( "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" - genericfilters "k8s.io/apiserver/pkg/server/filters" utilfeature "k8s.io/apiserver/pkg/util/feature" ) @@ -50,10 +49,9 @@ const ( ) type APIInstaller struct { - group *APIGroupVersion - prefix string // Path prefix where API resources are to be registered. - minRequestTimeout time.Duration - enableAPIResponseCompression bool + group *APIGroupVersion + prefix string // Path prefix where API resources are to be registered. + minRequestTimeout time.Duration } // Struct capturing information about an action ("GET", "POST", "WATCH", "PROXY", etc). @@ -630,9 +628,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler) } - if a.enableAPIResponseCompression { - handler = genericfilters.RestfulWithCompression(handler) - } doc := "read the specified " + kind if isSubresource { doc = "read " + subresource + " of the specified " + kind @@ -662,9 +657,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag doc = "list " + subresource + " of objects of kind " + kind } handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) - if a.enableAPIResponseCompression { - handler = genericfilters.RestfulWithCompression(handler) - } route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index 16e427ce3c0..43bf3731c4a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -95,7 +95,6 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library", @@ -103,7 +102,6 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/openapi:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 2ded9d9849f..6a1efaeec5c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -54,13 +54,11 @@ import ( genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi" apirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/features" genericregistry "k8s.io/apiserver/pkg/registry/generic" genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" serverstore "k8s.io/apiserver/pkg/server/storage" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" restclient "k8s.io/client-go/rest" certutil "k8s.io/client-go/util/cert" @@ -181,10 +179,6 @@ type Config struct { // Predicate which is true for paths of long-running http requests LongRunningFunc apirequest.LongRunningRequestCheck - // EnableAPIResponseCompression indicates whether API Responses should support compression - // if the client requests it via Accept-Encoding - EnableAPIResponseCompression bool - // MergedResourceConfig indicates which groupVersion enabled and its resources enabled/disabled. // This is composed of genericapiserver defaultAPIResourceConfig and those parsed from flags. // If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig. @@ -298,8 +292,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // proto when persisted in etcd. Assuming the upper bound of // the size ratio is 10:1, we set 100MB as the largest request // body size to be accepted and decoded in a write request. - MaxRequestBodyBytes: int64(100 * 1024 * 1024), - EnableAPIResponseCompression: utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression), + MaxRequestBodyBytes: int64(100 * 1024 * 1024), // Default to treating watch as a long-running operation // Generic API servers have no inherent long-running subresources @@ -511,9 +504,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer), - enableAPIResponseCompression: c.EnableAPIResponseCompression, - maxRequestBodyBytes: c.MaxRequestBodyBytes, - healthzClock: clock.RealClock{}, + maxRequestBodyBytes: c.MaxRequestBodyBytes, + healthzClock: clock.RealClock{}, } for { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index 44b669deb5a..cab3066bc74 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -9,7 +9,6 @@ load( go_test( name = "go_default_test", srcs = [ - "compression_test.go", "content_type_test.go", "cors_test.go", "maxinflight_test.go", @@ -32,7 +31,6 @@ go_test( go_library( name = "go_default_library", srcs = [ - "compression.go", "content_type.go", "cors.go", "doc.go", @@ -55,7 +53,6 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library", - "//vendor/github.com/emicklei/go-restful:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/compression.go b/staging/src/k8s.io/apiserver/pkg/server/filters/compression.go deleted file mode 100644 index 625cd5c8d3a..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/compression.go +++ /dev/null @@ -1,181 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package filters - -import ( - "compress/gzip" - "compress/zlib" - "errors" - "fmt" - "io" - "net/http" - "strings" - - "github.com/emicklei/go-restful" - - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apiserver/pkg/endpoints/request" -) - -// Compressor is an interface to compression writers -type Compressor interface { - io.WriteCloser - Flush() error -} - -const ( - headerAcceptEncoding = "Accept-Encoding" - headerContentEncoding = "Content-Encoding" - - encodingGzip = "gzip" - encodingDeflate = "deflate" -) - -// WithCompression wraps an http.Handler with the Compression Handler -func WithCompression(handler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - wantsCompression, encoding := wantsCompressedResponse(req) - w.Header().Set("Vary", "Accept-Encoding") - if wantsCompression { - compressionWriter, err := NewCompressionResponseWriter(w, encoding) - if err != nil { - handleError(w, req, err) - runtime.HandleError(fmt.Errorf("failed to compress HTTP response: %v", err)) - return - } - compressionWriter.Header().Set("Content-Encoding", encoding) - handler.ServeHTTP(compressionWriter, req) - compressionWriter.(*compressionResponseWriter).Close() - } else { - handler.ServeHTTP(w, req) - } - }) -} - -// wantsCompressedResponse reads the Accept-Encoding header to see if and which encoding is requested. -func wantsCompressedResponse(req *http.Request) (bool, string) { - // don't compress watches - ctx := req.Context() - info, ok := request.RequestInfoFrom(ctx) - if !ok { - return false, "" - } - if !info.IsResourceRequest { - return false, "" - } - if info.Verb == "watch" { - return false, "" - } - header := req.Header.Get(headerAcceptEncoding) - gi := strings.Index(header, encodingGzip) - zi := strings.Index(header, encodingDeflate) - // use in order of appearance - switch { - case gi == -1: - return zi != -1, encodingDeflate - case zi == -1: - return gi != -1, encodingGzip - case gi < zi: - return true, encodingGzip - default: - return true, encodingDeflate - } -} - -type compressionResponseWriter struct { - writer http.ResponseWriter - compressor Compressor - encoding string -} - -// NewCompressionResponseWriter returns wraps w with a compression ResponseWriter, using the given encoding -func NewCompressionResponseWriter(w http.ResponseWriter, encoding string) (http.ResponseWriter, error) { - var compressor Compressor - switch encoding { - case encodingGzip: - compressor = gzip.NewWriter(w) - case encodingDeflate: - compressor = zlib.NewWriter(w) - default: - return nil, fmt.Errorf("%s is not a supported encoding type", encoding) - } - return &compressionResponseWriter{ - writer: w, - compressor: compressor, - encoding: encoding, - }, nil -} - -// compressionResponseWriter implements http.Responsewriter Interface -var _ http.ResponseWriter = &compressionResponseWriter{} - -func (c *compressionResponseWriter) Header() http.Header { - return c.writer.Header() -} - -// compress data according to compression method -func (c *compressionResponseWriter) Write(p []byte) (int, error) { - if c.compressorClosed() { - return -1, errors.New("compressing error: tried to write data using closed compressor") - } - c.Header().Set(headerContentEncoding, c.encoding) - defer c.compressor.Flush() - return c.compressor.Write(p) -} - -func (c *compressionResponseWriter) WriteHeader(status int) { - c.writer.WriteHeader(status) -} - -// CloseNotify is part of http.CloseNotifier interface -func (c *compressionResponseWriter) CloseNotify() <-chan bool { - return c.writer.(http.CloseNotifier).CloseNotify() -} - -// Close the underlying compressor -func (c *compressionResponseWriter) Close() error { - if c.compressorClosed() { - return errors.New("Compressing error: tried to close already closed compressor") - } - - c.compressor.Close() - c.compressor = nil - return nil -} - -func (c *compressionResponseWriter) Flush() { - if c.compressorClosed() { - return - } - c.compressor.Flush() -} - -func (c *compressionResponseWriter) compressorClosed() bool { - return nil == c.compressor -} - -// RestfulWithCompression wraps WithCompression to be compatible with go-restful -func RestfulWithCompression(function restful.RouteFunction) restful.RouteFunction { - return restful.RouteFunction(func(request *restful.Request, response *restful.Response) { - handler := WithCompression(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - response.ResponseWriter = w - request.Request = req - function(request, response) - })) - handler.ServeHTTP(response.ResponseWriter, request.Request) - }) -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/compression_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/compression_test.go deleted file mode 100644 index b179cff8ee7..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/compression_test.go +++ /dev/null @@ -1,106 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package filters - -import ( - "bytes" - "compress/gzip" - "io" - "io/ioutil" - "net/http" - "net/http/httptest" - "testing" - - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apiserver/pkg/endpoints/filters" - "k8s.io/apiserver/pkg/endpoints/request" -) - -func TestCompression(t *testing.T) { - tests := []struct { - encoding string - watch bool - }{ - {"", false}, - {"gzip", true}, - {"gzip", false}, - } - - responseData := []byte("1234") - - for _, test := range tests { - handler := WithCompression( - http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.Write(responseData) - }), - ) - handler = filters.WithRequestInfo(handler, newTestRequestInfoResolver()) - server := httptest.NewServer(handler) - defer server.Close() - client := http.Client{ - Transport: &http.Transport{ - DisableCompression: true, - }, - } - - url := server.URL + "/api/v1/pods" - if test.watch { - url = url + "?watch=1" - } - request, err := http.NewRequest("GET", url, nil) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - request.Header.Set("Accept-Encoding", test.encoding) - response, err := client.Do(request) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - var reader io.Reader - if test.encoding == "gzip" && !test.watch { - if response.Header.Get("Content-Encoding") != "gzip" { - t.Fatal("expected response header Content-Encoding to be set to \"gzip\"") - } - if response.Header.Get("Vary") != "Accept-Encoding" { - t.Fatal("expected response header Vary to be set to \"Accept-Encoding\"") - } - reader, err = gzip.NewReader(response.Body) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - } else { - if response.Header.Get("Content-Encoding") == "gzip" { - t.Fatal("expected response header Content-Encoding not to be set") - } - reader = response.Body - } - body, err := ioutil.ReadAll(reader) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if !bytes.Equal(body, responseData) { - t.Fatalf("Expected response body %s to equal %s", body, responseData) - } - } -} - -func newTestRequestInfoResolver() *request.RequestInfoFactory { - return &request.RequestInfoFactory{ - APIPrefixes: sets.NewString("api", "apis"), - GrouplessAPIPrefixes: sets.NewString("api"), - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 5ca0f009a8f..87b6370fd0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -485,10 +485,9 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV EquivalentResourceRegistry: s.EquivalentResourceRegistry, - Admit: s.admissionControl, - MinRequestTimeout: s.minRequestTimeout, - EnableAPIResponseCompression: s.enableAPIResponseCompression, - Authorizer: s.Authorizer, + Admit: s.admissionControl, + MinRequestTimeout: s.minRequestTimeout, + Authorizer: s.Authorizer, } }