Merge pull request #77449 from smarterclayton/compress_2

Replace HTTP compression with a more scoped impl, only use on responses > 128KB
This commit is contained in:
Kubernetes Prow Robot 2019-07-09 01:54:03 -07:00 committed by GitHub
commit 7c7d70bc7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 513 additions and 382 deletions

View File

@ -543,7 +543,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
genericfeatures.ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
genericfeatures.DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha},
genericfeatures.APIResponseCompression: {Default: false, PreRelease: featuregate.Alpha},
genericfeatures.APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.APIListChunking: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.DryRun: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.ServerSideApply: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -52,7 +52,6 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/endpoints/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
@ -88,7 +87,6 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library",

View File

@ -71,7 +71,6 @@ import (
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server/filters"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
@ -1219,7 +1218,12 @@ func TestListCompression(t *testing.T) {
}
for i, testCase := range testCases {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{expectedResourceNamespace: testCase.namespace}
simpleStorage := SimpleRESTStorage{
expectedResourceNamespace: testCase.namespace,
list: []genericapitesting.Simple{
{Other: strings.Repeat("0123456789abcdef", (128*1024/16)+1)},
},
}
storage["simple"] = &simpleStorage
selfLinker := &setTestSelfLinker{
t: t,
@ -1228,7 +1232,6 @@ func TestListCompression(t *testing.T) {
}
var handler = handleInternal(storage, admissionControl, selfLinker, nil)
handler = filters.WithCompression(handler)
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver())
server := httptest.NewServer(handler)
@ -1656,13 +1659,53 @@ func BenchmarkGet(b *testing.B) {
b.StopTimer()
}
func TestGetCompression(t *testing.T) {
func BenchmarkGetNoCompression(b *testing.B) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
item: genericapitesting.Simple{
Other: "foo",
},
}
selfLinker := &setTestSelfLinker{
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
name: "id",
namespace: "default",
}
storage["simple"] = &simpleStorage
handler := handleLinker(storage, selfLinker)
server := httptest.NewServer(handler)
defer server.Close()
client := &http.Client{
Transport: &http.Transport{
DisableCompression: true,
},
}
u := server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id"
b.ResetTimer()
for i := 0; i < b.N; i++ {
resp, err := client.Get(u)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
b.Fatalf("unexpected response: %#v", resp)
}
if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil {
b.Fatalf("unable to read body")
}
}
b.StopTimer()
}
func TestGetCompression(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
item: genericapitesting.Simple{
Other: strings.Repeat("0123456789abcdef", (128*1024/16)+1),
},
}
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
@ -1672,7 +1715,6 @@ func TestGetCompression(t *testing.T) {
storage["simple"] = &simpleStorage
handler := handleLinker(storage, selfLinker)
handler = filters.WithCompression(handler)
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver())
server := httptest.NewServer(handler)
defer server.Close()
@ -1687,7 +1729,7 @@ func TestGetCompression(t *testing.T) {
for _, test := range tests {
req, err := http.NewRequest("GET", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/id", nil)
if err != nil {
t.Fatalf("unexpected error cretaing request: %v", err)
t.Fatalf("unexpected error creating request: %v", err)
}
// It's necessary to manually set Accept-Encoding here
// to prevent http.DefaultClient from automatically

View File

@ -20,7 +20,7 @@ import (
"path"
"time"
"github.com/emicklei/go-restful"
restful "github.com/emicklei/go-restful"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -83,10 +83,6 @@ type APIGroupVersion struct {
MinRequestTimeout time.Duration
// EnableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
EnableAPIResponseCompression bool
// OpenAPIModels exposes the OpenAPI models to each individual handler.
OpenAPIModels openapiproto.Models
@ -101,10 +97,9 @@ type APIGroupVersion struct {
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
enableAPIResponseCompression: g.EnableAPIResponseCompression,
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, ws, registrationErrors := installer.Install()

View File

@ -11,6 +11,7 @@ go_test(
srcs = [
"errors_test.go",
"status_test.go",
"writers_test.go",
],
embed = [":go_default_library"],
deps = [
@ -19,9 +20,13 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
],
)
@ -46,8 +51,10 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/wsstream:go_default_library",
],

View File

@ -17,11 +17,17 @@ limitations under the License.
package responsewriters
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
"k8s.io/apiserver/pkg/features"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -31,30 +37,11 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flushwriter"
"k8s.io/apiserver/pkg/util/wsstream"
)
// httpResponseWriterWithInit wraps http.ResponseWriter, and implements the io.Writer interface to be used
// with encoding. The purpose is to allow for encoding to a stream, while accommodating a custom HTTP status code
// if encoding fails, and meeting the encoder's io.Writer interface requirement.
type httpResponseWriterWithInit struct {
hasWritten bool
mediaType string
statusCode int
innerW http.ResponseWriter
}
func (w httpResponseWriterWithInit) Write(b []byte) (n int, err error) {
if !w.hasWritten {
w.innerW.Header().Set("Content-Type", w.mediaType)
w.innerW.WriteHeader(w.statusCode)
w.hasWritten = true
}
return w.innerW.Write(b)
}
// StreamObject performs input stream negotiation from a ResourceStreamer and writes that to the response.
// If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many
// browser clients cannot easily handle binary streaming protocols).
@ -96,15 +83,142 @@ func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSe
}
// SerializeObject renders an object in the content type negotiated by the client using the provided encoder.
// The context is optional and can be nil.
func SerializeObject(mediaType string, encoder runtime.Encoder, innerW http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
w := httpResponseWriterWithInit{mediaType: mediaType, innerW: innerW, statusCode: statusCode}
if err := encoder.Encode(object, w); err != nil {
errSerializationFatal(err, encoder, w)
// The context is optional and can be nil. This method will perform optional content compression if requested by
// a client and the feature gate for APIResponseCompression is enabled.
func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
w := &deferredResponseWriter{
mediaType: mediaType,
statusCode: statusCode,
contentEncoding: negotiateContentEncoding(req),
hw: hw,
}
err := encoder.Encode(object, w)
if err == nil {
err = w.Close()
if err == nil {
return
}
}
// make a best effort to write the object if a failure is detected
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := ErrorToAPIStatus(err)
candidateStatusCode := int(status.Code)
// if the current status code is successful, allow the error's status code to overwrite it
if statusCode >= http.StatusOK && statusCode < http.StatusBadRequest {
w.statusCode = candidateStatusCode
}
output, err := runtime.Encode(encoder, status)
if err != nil {
w.mediaType = "text/plain"
output = []byte(fmt.Sprintf("%s: %s", status.Reason, status.Message))
}
if _, err := w.Write(output); err != nil {
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a fallback JSON response: %v", err))
}
w.Close()
}
var gzipPool = &sync.Pool{
New: func() interface{} {
gw, err := gzip.NewWriterLevel(nil, defaultGzipContentEncodingLevel)
if err != nil {
panic(err)
}
return gw
},
}
const (
// defaultGzipContentEncodingLevel is set to 4 which uses less CPU than the default level
defaultGzipContentEncodingLevel = 4
// defaultGzipThresholdBytes is compared to the size of the first write from the stream
// (usually the entire object), and if the size is smaller no gzipping will be performed
// if the client requests it.
defaultGzipThresholdBytes = 128 * 1024
)
// negotiateContentEncoding returns a supported client-requested content encoding for the
// provided request. It will return the empty string if no supported content encoding was
// found or if response compression is disabled.
func negotiateContentEncoding(req *http.Request) string {
encoding := req.Header.Get("Accept-Encoding")
if len(encoding) == 0 {
return ""
}
if !utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression) {
return ""
}
for len(encoding) > 0 {
var token string
if next := strings.Index(encoding, ","); next != -1 {
token = encoding[:next]
encoding = encoding[next+1:]
} else {
token = encoding
encoding = ""
}
switch strings.TrimSpace(token) {
case "gzip":
return "gzip"
}
}
return ""
}
type deferredResponseWriter struct {
mediaType string
statusCode int
contentEncoding string
hasWritten bool
hw http.ResponseWriter
w io.Writer
}
func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
if w.hasWritten {
return w.w.Write(p)
}
w.hasWritten = true
hw := w.hw
header := hw.Header()
switch {
case w.contentEncoding == "gzip" && len(p) > defaultGzipThresholdBytes:
header.Set("Content-Encoding", "gzip")
header.Add("Vary", "Accept-Encoding")
gw := gzipPool.Get().(*gzip.Writer)
gw.Reset(hw)
w.w = gw
default:
w.w = hw
}
header.Set("Content-Type", w.mediaType)
hw.WriteHeader(w.statusCode)
return w.w.Write(p)
}
func (w *deferredResponseWriter) Close() error {
if !w.hasWritten {
return nil
}
var err error
switch t := w.w.(type) {
case *gzip.Writer:
err = t.Close()
t.Reset(nil)
gzipPool.Put(t)
}
return err
}
var nopCloser = ioutil.NopCloser(nil)
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
stream, ok := object.(rest.ResourceStreamer)
@ -157,25 +271,6 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
return code
}
// errSerializationFatal renders an error to the response, and if codec fails will render plaintext.
// Returns the HTTP status code of the error.
func errSerializationFatal(err error, codec runtime.Encoder, w httpResponseWriterWithInit) {
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := ErrorToAPIStatus(err)
candidateStatusCode := int(status.Code)
// If original statusCode was not successful, we need to return the original error.
// We cannot hide it behind serialization problems
if w.statusCode >= http.StatusOK && w.statusCode < http.StatusBadRequest {
w.statusCode = candidateStatusCode
}
output, err := runtime.Encode(codec, status)
if err != nil {
w.mediaType = "text/plain"
output = []byte(fmt.Sprintf("%s: %s", status.Reason, status.Message))
}
w.Write(output)
}
// WriteRawJSON writes a non-API object in JSON.
func WriteRawJSON(statusCode int, object interface{}, w http.ResponseWriter) {
output, err := json.MarshalIndent(object, "", " ")

View File

@ -0,0 +1,303 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package responsewriters
import (
"bytes"
"compress/gzip"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
func TestSerializeObject(t *testing.T) {
smallPayload := []byte("{test-object,test-object}")
largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1)
tests := []struct {
name string
compressionEnabled bool
mediaType string
out []byte
outErrs []error
req *http.Request
statusCode int
object runtime.Object
wantCode int
wantHeaders http.Header
wantBody []byte
}{
{
name: "serialize object",
out: smallPayload,
req: &http.Request{Header: http.Header{}},
wantCode: http.StatusOK,
wantHeaders: http.Header{"Content-Type": []string{""}},
wantBody: smallPayload,
},
{
name: "return content type",
out: smallPayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{}},
wantCode: http.StatusOK,
wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
wantBody: smallPayload,
},
{
name: "return status code",
statusCode: http.StatusBadRequest,
out: smallPayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{}},
wantCode: http.StatusBadRequest,
wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
wantBody: smallPayload,
},
{
name: "fail to encode object",
out: smallPayload,
outErrs: []error{fmt.Errorf("bad")},
mediaType: "application/json",
req: &http.Request{Header: http.Header{}},
wantCode: http.StatusInternalServerError,
wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
wantBody: smallPayload,
},
{
name: "fail to encode object or status",
out: smallPayload,
outErrs: []error{fmt.Errorf("bad"), fmt.Errorf("bad2")},
mediaType: "application/json",
req: &http.Request{Header: http.Header{}},
wantCode: http.StatusInternalServerError,
wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
wantBody: []byte(": bad"),
},
{
name: "fail to encode object or status with status code",
out: smallPayload,
outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
mediaType: "application/json",
req: &http.Request{Header: http.Header{}},
statusCode: http.StatusOK,
wantCode: http.StatusNotFound,
wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
wantBody: []byte("NotFound: \"test\" not found"),
},
{
name: "fail to encode object or status with status code and keeps previous error",
out: smallPayload,
outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
mediaType: "application/json",
req: &http.Request{Header: http.Header{}},
statusCode: http.StatusNotAcceptable,
wantCode: http.StatusNotAcceptable,
wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
wantBody: []byte("NotFound: \"test\" not found"),
},
{
name: "compression requires feature gate",
out: largePayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{
"Accept-Encoding": []string{"gzip"},
}},
wantCode: http.StatusOK,
wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
wantBody: largePayload,
},
{
name: "compress on gzip",
compressionEnabled: true,
out: largePayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{
"Accept-Encoding": []string{"gzip"},
}},
wantCode: http.StatusOK,
wantHeaders: http.Header{
"Content-Type": []string{"application/json"},
"Content-Encoding": []string{"gzip"},
"Vary": []string{"Accept-Encoding"},
},
wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel),
},
{
name: "compression is not performed on small objects",
compressionEnabled: true,
out: smallPayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{
"Accept-Encoding": []string{"gzip"},
}},
wantCode: http.StatusOK,
wantHeaders: http.Header{
"Content-Type": []string{"application/json"},
},
wantBody: smallPayload,
},
{
name: "compress when multiple encodings are requested",
compressionEnabled: true,
out: largePayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{
"Accept-Encoding": []string{"deflate, , gzip,"},
}},
wantCode: http.StatusOK,
wantHeaders: http.Header{
"Content-Type": []string{"application/json"},
"Content-Encoding": []string{"gzip"},
"Vary": []string{"Accept-Encoding"},
},
wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel),
},
{
name: "ignore compression on deflate",
compressionEnabled: true,
out: largePayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{
"Accept-Encoding": []string{"deflate"},
}},
wantCode: http.StatusOK,
wantHeaders: http.Header{
"Content-Type": []string{"application/json"},
},
wantBody: largePayload,
},
{
name: "ignore compression on unrecognized types",
compressionEnabled: true,
out: largePayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{
"Accept-Encoding": []string{", , other, nothing, what, "},
}},
wantCode: http.StatusOK,
wantHeaders: http.Header{
"Content-Type": []string{"application/json"},
},
wantBody: largePayload,
},
{
name: "errors are compressed",
compressionEnabled: true,
statusCode: http.StatusInternalServerError,
out: smallPayload,
outErrs: []error{fmt.Errorf(string(largePayload)), fmt.Errorf("bad2")},
mediaType: "application/json",
req: &http.Request{Header: http.Header{
"Accept-Encoding": []string{"gzip"},
}},
wantCode: http.StatusInternalServerError,
wantHeaders: http.Header{
"Content-Type": []string{"text/plain"},
"Content-Encoding": []string{"gzip"},
"Vary": []string{"Accept-Encoding"},
},
wantBody: gzipContent([]byte(": "+string(largePayload)), defaultGzipContentEncodingLevel),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, tt.compressionEnabled)()
encoder := &fakeEncoder{
buf: tt.out,
errs: tt.outErrs,
}
if tt.statusCode == 0 {
tt.statusCode = http.StatusOK
}
recorder := httptest.NewRecorder()
SerializeObject(tt.mediaType, encoder, recorder, tt.req, tt.statusCode, tt.object)
result := recorder.Result()
if result.StatusCode != tt.wantCode {
t.Fatalf("unexpected code: %v", result.StatusCode)
}
if !reflect.DeepEqual(result.Header, tt.wantHeaders) {
t.Fatal(diff.ObjectReflectDiff(tt.wantHeaders, result.Header))
}
body, _ := ioutil.ReadAll(result.Body)
if !bytes.Equal(tt.wantBody, body) {
t.Fatalf("wanted:\n%s\ngot:\n%s", hex.Dump(tt.wantBody), hex.Dump(body))
}
})
}
}
type fakeEncoder struct {
obj runtime.Object
buf []byte
errs []error
}
func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error {
e.obj = obj
if len(e.errs) > 0 {
err := e.errs[0]
e.errs = e.errs[1:]
return err
}
_, err := w.Write(e.buf)
return err
}
func gzipContent(data []byte, level int) []byte {
buf := &bytes.Buffer{}
gw, err := gzip.NewWriterLevel(buf, level)
if err != nil {
panic(err)
}
if _, err := gw.Write(data); err != nil {
panic(err)
}
if err := gw.Close(); err != nil {
panic(err)
}
return buf.Bytes()
}

View File

@ -40,7 +40,6 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
genericfilters "k8s.io/apiserver/pkg/server/filters"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
@ -50,10 +49,9 @@ const (
)
type APIInstaller struct {
group *APIGroupVersion
prefix string // Path prefix where API resources are to be registered.
minRequestTimeout time.Duration
enableAPIResponseCompression bool
group *APIGroupVersion
prefix string // Path prefix where API resources are to be registered.
minRequestTimeout time.Duration
}
// Struct capturing information about an action ("GET", "POST", "WATCH", "PROXY", etc).
@ -630,9 +628,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
}
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler)
}
doc := "read the specified " + kind
if isSubresource {
doc = "read " + subresource + " of the specified " + kind
@ -662,9 +657,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "list " + subresource + " of objects of kind " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler)
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).

View File

@ -145,7 +145,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha},
APIResponseCompression: {Default: false, PreRelease: featuregate.Alpha},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.Beta},
RemainingItemCount: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -95,7 +95,6 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
@ -103,7 +102,6 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/openapi:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -54,13 +54,11 @@ import (
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes"
serverstore "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest"
certutil "k8s.io/client-go/util/cert"
@ -181,10 +179,6 @@ type Config struct {
// Predicate which is true for paths of long-running http requests
LongRunningFunc apirequest.LongRunningRequestCheck
// EnableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
EnableAPIResponseCompression bool
// MergedResourceConfig indicates which groupVersion enabled and its resources enabled/disabled.
// This is composed of genericapiserver defaultAPIResourceConfig and those parsed from flags.
// If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig.
@ -298,8 +292,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// proto when persisted in etcd. Assuming the upper bound of
// the size ratio is 10:1, we set 100MB as the largest request
// body size to be accepted and decoded in a write request.
MaxRequestBodyBytes: int64(100 * 1024 * 1024),
EnableAPIResponseCompression: utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression),
MaxRequestBodyBytes: int64(100 * 1024 * 1024),
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
@ -511,9 +504,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
enableAPIResponseCompression: c.EnableAPIResponseCompression,
maxRequestBodyBytes: c.MaxRequestBodyBytes,
healthzClock: clock.RealClock{},
maxRequestBodyBytes: c.MaxRequestBodyBytes,
healthzClock: clock.RealClock{},
}
for {

View File

@ -9,7 +9,6 @@ load(
go_test(
name = "go_default_test",
srcs = [
"compression_test.go",
"content_type_test.go",
"cors_test.go",
"maxinflight_test.go",
@ -32,7 +31,6 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"compression.go",
"content_type.go",
"cors.go",
"doc.go",
@ -55,7 +53,6 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -1,181 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"compress/gzip"
"compress/zlib"
"errors"
"fmt"
"io"
"net/http"
"strings"
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/endpoints/request"
)
// Compressor is an interface to compression writers
type Compressor interface {
io.WriteCloser
Flush() error
}
const (
headerAcceptEncoding = "Accept-Encoding"
headerContentEncoding = "Content-Encoding"
encodingGzip = "gzip"
encodingDeflate = "deflate"
)
// WithCompression wraps an http.Handler with the Compression Handler
func WithCompression(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
wantsCompression, encoding := wantsCompressedResponse(req)
w.Header().Set("Vary", "Accept-Encoding")
if wantsCompression {
compressionWriter, err := NewCompressionResponseWriter(w, encoding)
if err != nil {
handleError(w, req, err)
runtime.HandleError(fmt.Errorf("failed to compress HTTP response: %v", err))
return
}
compressionWriter.Header().Set("Content-Encoding", encoding)
handler.ServeHTTP(compressionWriter, req)
compressionWriter.(*compressionResponseWriter).Close()
} else {
handler.ServeHTTP(w, req)
}
})
}
// wantsCompressedResponse reads the Accept-Encoding header to see if and which encoding is requested.
func wantsCompressedResponse(req *http.Request) (bool, string) {
// don't compress watches
ctx := req.Context()
info, ok := request.RequestInfoFrom(ctx)
if !ok {
return false, ""
}
if !info.IsResourceRequest {
return false, ""
}
if info.Verb == "watch" {
return false, ""
}
header := req.Header.Get(headerAcceptEncoding)
gi := strings.Index(header, encodingGzip)
zi := strings.Index(header, encodingDeflate)
// use in order of appearance
switch {
case gi == -1:
return zi != -1, encodingDeflate
case zi == -1:
return gi != -1, encodingGzip
case gi < zi:
return true, encodingGzip
default:
return true, encodingDeflate
}
}
type compressionResponseWriter struct {
writer http.ResponseWriter
compressor Compressor
encoding string
}
// NewCompressionResponseWriter returns wraps w with a compression ResponseWriter, using the given encoding
func NewCompressionResponseWriter(w http.ResponseWriter, encoding string) (http.ResponseWriter, error) {
var compressor Compressor
switch encoding {
case encodingGzip:
compressor = gzip.NewWriter(w)
case encodingDeflate:
compressor = zlib.NewWriter(w)
default:
return nil, fmt.Errorf("%s is not a supported encoding type", encoding)
}
return &compressionResponseWriter{
writer: w,
compressor: compressor,
encoding: encoding,
}, nil
}
// compressionResponseWriter implements http.Responsewriter Interface
var _ http.ResponseWriter = &compressionResponseWriter{}
func (c *compressionResponseWriter) Header() http.Header {
return c.writer.Header()
}
// compress data according to compression method
func (c *compressionResponseWriter) Write(p []byte) (int, error) {
if c.compressorClosed() {
return -1, errors.New("compressing error: tried to write data using closed compressor")
}
c.Header().Set(headerContentEncoding, c.encoding)
defer c.compressor.Flush()
return c.compressor.Write(p)
}
func (c *compressionResponseWriter) WriteHeader(status int) {
c.writer.WriteHeader(status)
}
// CloseNotify is part of http.CloseNotifier interface
func (c *compressionResponseWriter) CloseNotify() <-chan bool {
return c.writer.(http.CloseNotifier).CloseNotify()
}
// Close the underlying compressor
func (c *compressionResponseWriter) Close() error {
if c.compressorClosed() {
return errors.New("Compressing error: tried to close already closed compressor")
}
c.compressor.Close()
c.compressor = nil
return nil
}
func (c *compressionResponseWriter) Flush() {
if c.compressorClosed() {
return
}
c.compressor.Flush()
}
func (c *compressionResponseWriter) compressorClosed() bool {
return nil == c.compressor
}
// RestfulWithCompression wraps WithCompression to be compatible with go-restful
func RestfulWithCompression(function restful.RouteFunction) restful.RouteFunction {
return restful.RouteFunction(func(request *restful.Request, response *restful.Response) {
handler := WithCompression(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
response.ResponseWriter = w
request.Request = req
function(request, response)
}))
handler.ServeHTTP(response.ResponseWriter, request.Request)
})
}

View File

@ -1,106 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/request"
)
func TestCompression(t *testing.T) {
tests := []struct {
encoding string
watch bool
}{
{"", false},
{"gzip", true},
{"gzip", false},
}
responseData := []byte("1234")
for _, test := range tests {
handler := WithCompression(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write(responseData)
}),
)
handler = filters.WithRequestInfo(handler, newTestRequestInfoResolver())
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{
Transport: &http.Transport{
DisableCompression: true,
},
}
url := server.URL + "/api/v1/pods"
if test.watch {
url = url + "?watch=1"
}
request, err := http.NewRequest("GET", url, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
request.Header.Set("Accept-Encoding", test.encoding)
response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
var reader io.Reader
if test.encoding == "gzip" && !test.watch {
if response.Header.Get("Content-Encoding") != "gzip" {
t.Fatal("expected response header Content-Encoding to be set to \"gzip\"")
}
if response.Header.Get("Vary") != "Accept-Encoding" {
t.Fatal("expected response header Vary to be set to \"Accept-Encoding\"")
}
reader, err = gzip.NewReader(response.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
} else {
if response.Header.Get("Content-Encoding") == "gzip" {
t.Fatal("expected response header Content-Encoding not to be set")
}
reader = response.Body
}
body, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !bytes.Equal(body, responseData) {
t.Fatalf("Expected response body %s to equal %s", body, responseData)
}
}
}
func newTestRequestInfoResolver() *request.RequestInfoFactory {
return &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
}

View File

@ -493,10 +493,9 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
EquivalentResourceRegistry: s.EquivalentResourceRegistry,
Admit: s.admissionControl,
MinRequestTimeout: s.minRequestTimeout,
EnableAPIResponseCompression: s.enableAPIResponseCompression,
Authorizer: s.Authorizer,
Admit: s.admissionControl,
MinRequestTimeout: s.minRequestTimeout,
Authorizer: s.Authorizer,
}
}