mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Replace HTTP compression with an inline handler
The previous HTTP compression implementation functioned as a filter, which required it to deal with a number of special cases that complicated the implementation. Instead, when we write an API object to a response, handle only that one case. This will allow a more limited implementation that does not impact other code flows. Also, to prevent excessive CPU use on small objects, compression is disabled on responses smaller than 128Kb in size.
This commit is contained in:
parent
a872c6826c
commit
4ed2b9875d
@ -52,7 +52,6 @@ go_test(
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/testing:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
@ -88,7 +87,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/github.com/emicklei/go-restful:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library",
|
||||
|
@ -71,7 +71,6 @@ import (
|
||||
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/server/filters"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
)
|
||||
@ -1219,7 +1218,12 @@ func TestListCompression(t *testing.T) {
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := SimpleRESTStorage{expectedResourceNamespace: testCase.namespace}
|
||||
simpleStorage := SimpleRESTStorage{
|
||||
expectedResourceNamespace: testCase.namespace,
|
||||
list: []genericapitesting.Simple{
|
||||
{Other: strings.Repeat("0123456789abcdef", (128*1024/16)+1)},
|
||||
},
|
||||
}
|
||||
storage["simple"] = &simpleStorage
|
||||
selfLinker := &setTestSelfLinker{
|
||||
t: t,
|
||||
@ -1228,7 +1232,6 @@ func TestListCompression(t *testing.T) {
|
||||
}
|
||||
var handler = handleInternal(storage, admissionControl, selfLinker, nil)
|
||||
|
||||
handler = filters.WithCompression(handler)
|
||||
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver())
|
||||
|
||||
server := httptest.NewServer(handler)
|
||||
@ -1656,13 +1659,53 @@ func BenchmarkGet(b *testing.B) {
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
func TestGetCompression(t *testing.T) {
|
||||
func BenchmarkGetNoCompression(b *testing.B) {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := SimpleRESTStorage{
|
||||
item: genericapitesting.Simple{
|
||||
Other: "foo",
|
||||
},
|
||||
}
|
||||
selfLinker := &setTestSelfLinker{
|
||||
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
|
||||
name: "id",
|
||||
namespace: "default",
|
||||
}
|
||||
storage["simple"] = &simpleStorage
|
||||
handler := handleLinker(storage, selfLinker)
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DisableCompression: true,
|
||||
},
|
||||
}
|
||||
|
||||
u := server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id"
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
resp, err := client.Get(u)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b.Fatalf("unexpected response: %#v", resp)
|
||||
}
|
||||
if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil {
|
||||
b.Fatalf("unable to read body")
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
}
|
||||
func TestGetCompression(t *testing.T) {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := SimpleRESTStorage{
|
||||
item: genericapitesting.Simple{
|
||||
Other: strings.Repeat("0123456789abcdef", (128*1024/16)+1),
|
||||
},
|
||||
}
|
||||
selfLinker := &setTestSelfLinker{
|
||||
t: t,
|
||||
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
|
||||
@ -1672,7 +1715,6 @@ func TestGetCompression(t *testing.T) {
|
||||
|
||||
storage["simple"] = &simpleStorage
|
||||
handler := handleLinker(storage, selfLinker)
|
||||
handler = filters.WithCompression(handler)
|
||||
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver())
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
@ -1687,7 +1729,7 @@ func TestGetCompression(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
req, err := http.NewRequest("GET", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/id", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error cretaing request: %v", err)
|
||||
t.Fatalf("unexpected error creating request: %v", err)
|
||||
}
|
||||
// It's necessary to manually set Accept-Encoding here
|
||||
// to prevent http.DefaultClient from automatically
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
restful "github.com/emicklei/go-restful"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -83,10 +83,6 @@ type APIGroupVersion struct {
|
||||
|
||||
MinRequestTimeout time.Duration
|
||||
|
||||
// EnableAPIResponseCompression indicates whether API Responses should support compression
|
||||
// if the client requests it via Accept-Encoding
|
||||
EnableAPIResponseCompression bool
|
||||
|
||||
// OpenAPIModels exposes the OpenAPI models to each individual handler.
|
||||
OpenAPIModels openapiproto.Models
|
||||
|
||||
@ -101,10 +97,9 @@ type APIGroupVersion struct {
|
||||
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
|
||||
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
|
||||
installer := &APIInstaller{
|
||||
group: g,
|
||||
prefix: prefix,
|
||||
minRequestTimeout: g.MinRequestTimeout,
|
||||
enableAPIResponseCompression: g.EnableAPIResponseCompression,
|
||||
group: g,
|
||||
prefix: prefix,
|
||||
minRequestTimeout: g.MinRequestTimeout,
|
||||
}
|
||||
|
||||
apiResources, ws, registrationErrors := installer.Install()
|
||||
|
@ -11,6 +11,7 @@ go_test(
|
||||
srcs = [
|
||||
"errors_test.go",
|
||||
"status_test.go",
|
||||
"writers_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
@ -19,9 +20,13 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@ -46,8 +51,10 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/wsstream:go_default_library",
|
||||
],
|
||||
|
@ -17,11 +17,17 @@ limitations under the License.
|
||||
package responsewriters
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -31,30 +37,11 @@ import (
|
||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/apiserver/pkg/util/flushwriter"
|
||||
"k8s.io/apiserver/pkg/util/wsstream"
|
||||
)
|
||||
|
||||
// httpResponseWriterWithInit wraps http.ResponseWriter, and implements the io.Writer interface to be used
|
||||
// with encoding. The purpose is to allow for encoding to a stream, while accommodating a custom HTTP status code
|
||||
// if encoding fails, and meeting the encoder's io.Writer interface requirement.
|
||||
type httpResponseWriterWithInit struct {
|
||||
hasWritten bool
|
||||
mediaType string
|
||||
statusCode int
|
||||
innerW http.ResponseWriter
|
||||
}
|
||||
|
||||
func (w httpResponseWriterWithInit) Write(b []byte) (n int, err error) {
|
||||
if !w.hasWritten {
|
||||
w.innerW.Header().Set("Content-Type", w.mediaType)
|
||||
w.innerW.WriteHeader(w.statusCode)
|
||||
w.hasWritten = true
|
||||
}
|
||||
|
||||
return w.innerW.Write(b)
|
||||
}
|
||||
|
||||
// StreamObject performs input stream negotiation from a ResourceStreamer and writes that to the response.
|
||||
// If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many
|
||||
// browser clients cannot easily handle binary streaming protocols).
|
||||
@ -96,15 +83,142 @@ func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSe
|
||||
}
|
||||
|
||||
// SerializeObject renders an object in the content type negotiated by the client using the provided encoder.
|
||||
// The context is optional and can be nil.
|
||||
func SerializeObject(mediaType string, encoder runtime.Encoder, innerW http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
|
||||
w := httpResponseWriterWithInit{mediaType: mediaType, innerW: innerW, statusCode: statusCode}
|
||||
|
||||
if err := encoder.Encode(object, w); err != nil {
|
||||
errSerializationFatal(err, encoder, w)
|
||||
// The context is optional and can be nil. This method will perform optional content compression if requested by
|
||||
// a client and the feature gate for APIResponseCompression is enabled.
|
||||
func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
|
||||
w := &deferredResponseWriter{
|
||||
mediaType: mediaType,
|
||||
statusCode: statusCode,
|
||||
contentEncoding: negotiateContentEncoding(req),
|
||||
hw: hw,
|
||||
}
|
||||
|
||||
err := encoder.Encode(object, w)
|
||||
if err == nil {
|
||||
err = w.Close()
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// make a best effort to write the object if a failure is detected
|
||||
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
|
||||
status := ErrorToAPIStatus(err)
|
||||
candidateStatusCode := int(status.Code)
|
||||
// if the current status code is successful, allow the error's status code to overwrite it
|
||||
if statusCode >= http.StatusOK && statusCode < http.StatusBadRequest {
|
||||
w.statusCode = candidateStatusCode
|
||||
}
|
||||
output, err := runtime.Encode(encoder, status)
|
||||
if err != nil {
|
||||
w.mediaType = "text/plain"
|
||||
output = []byte(fmt.Sprintf("%s: %s", status.Reason, status.Message))
|
||||
}
|
||||
if _, err := w.Write(output); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a fallback JSON response: %v", err))
|
||||
}
|
||||
w.Close()
|
||||
}
|
||||
|
||||
var gzipPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
gw, err := gzip.NewWriterLevel(nil, defaultGzipContentEncodingLevel)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return gw
|
||||
},
|
||||
}
|
||||
|
||||
const (
|
||||
// defaultGzipContentEncodingLevel is set to 4 which uses less CPU than the default level
|
||||
defaultGzipContentEncodingLevel = 4
|
||||
// defaultGzipThresholdBytes is compared to the size of the first write from the stream
|
||||
// (usually the entire object), and if the size is smaller no gzipping will be performed
|
||||
// if the client requests it.
|
||||
defaultGzipThresholdBytes = 128 * 1024
|
||||
)
|
||||
|
||||
// negotiateContentEncoding returns a supported client-requested content encoding for the
|
||||
// provided request. It will return the empty string if no supported content encoding was
|
||||
// found or if response compression is disabled.
|
||||
func negotiateContentEncoding(req *http.Request) string {
|
||||
encoding := req.Header.Get("Accept-Encoding")
|
||||
if len(encoding) == 0 {
|
||||
return ""
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression) {
|
||||
return ""
|
||||
}
|
||||
for len(encoding) > 0 {
|
||||
var token string
|
||||
if next := strings.Index(encoding, ","); next != -1 {
|
||||
token = encoding[:next]
|
||||
encoding = encoding[next+1:]
|
||||
} else {
|
||||
token = encoding
|
||||
encoding = ""
|
||||
}
|
||||
switch strings.TrimSpace(token) {
|
||||
case "gzip":
|
||||
return "gzip"
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type deferredResponseWriter struct {
|
||||
mediaType string
|
||||
statusCode int
|
||||
contentEncoding string
|
||||
|
||||
hasWritten bool
|
||||
hw http.ResponseWriter
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
|
||||
if w.hasWritten {
|
||||
return w.w.Write(p)
|
||||
}
|
||||
w.hasWritten = true
|
||||
|
||||
hw := w.hw
|
||||
header := hw.Header()
|
||||
switch {
|
||||
case w.contentEncoding == "gzip" && len(p) > defaultGzipThresholdBytes:
|
||||
header.Set("Content-Encoding", "gzip")
|
||||
header.Add("Vary", "Accept-Encoding")
|
||||
|
||||
gw := gzipPool.Get().(*gzip.Writer)
|
||||
gw.Reset(hw)
|
||||
|
||||
w.w = gw
|
||||
default:
|
||||
w.w = hw
|
||||
}
|
||||
|
||||
header.Set("Content-Type", w.mediaType)
|
||||
hw.WriteHeader(w.statusCode)
|
||||
return w.w.Write(p)
|
||||
}
|
||||
|
||||
func (w *deferredResponseWriter) Close() error {
|
||||
if !w.hasWritten {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
switch t := w.w.(type) {
|
||||
case *gzip.Writer:
|
||||
err = t.Close()
|
||||
t.Reset(nil)
|
||||
gzipPool.Put(t)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var nopCloser = ioutil.NopCloser(nil)
|
||||
|
||||
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
|
||||
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
|
||||
stream, ok := object.(rest.ResourceStreamer)
|
||||
@ -157,25 +271,6 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
|
||||
return code
|
||||
}
|
||||
|
||||
// errSerializationFatal renders an error to the response, and if codec fails will render plaintext.
|
||||
// Returns the HTTP status code of the error.
|
||||
func errSerializationFatal(err error, codec runtime.Encoder, w httpResponseWriterWithInit) {
|
||||
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
|
||||
status := ErrorToAPIStatus(err)
|
||||
candidateStatusCode := int(status.Code)
|
||||
// If original statusCode was not successful, we need to return the original error.
|
||||
// We cannot hide it behind serialization problems
|
||||
if w.statusCode >= http.StatusOK && w.statusCode < http.StatusBadRequest {
|
||||
w.statusCode = candidateStatusCode
|
||||
}
|
||||
output, err := runtime.Encode(codec, status)
|
||||
if err != nil {
|
||||
w.mediaType = "text/plain"
|
||||
output = []byte(fmt.Sprintf("%s: %s", status.Reason, status.Message))
|
||||
}
|
||||
w.Write(output)
|
||||
}
|
||||
|
||||
// WriteRawJSON writes a non-API object in JSON.
|
||||
func WriteRawJSON(statusCode int, object interface{}, w http.ResponseWriter) {
|
||||
output, err := json.MarshalIndent(object, "", " ")
|
||||
|
@ -0,0 +1,303 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package responsewriters
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
)
|
||||
|
||||
func TestSerializeObject(t *testing.T) {
|
||||
smallPayload := []byte("{test-object,test-object}")
|
||||
largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1)
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
compressionEnabled bool
|
||||
|
||||
mediaType string
|
||||
out []byte
|
||||
outErrs []error
|
||||
req *http.Request
|
||||
statusCode int
|
||||
object runtime.Object
|
||||
|
||||
wantCode int
|
||||
wantHeaders http.Header
|
||||
wantBody []byte
|
||||
}{
|
||||
{
|
||||
name: "serialize object",
|
||||
out: smallPayload,
|
||||
req: &http.Request{Header: http.Header{}},
|
||||
wantCode: http.StatusOK,
|
||||
wantHeaders: http.Header{"Content-Type": []string{""}},
|
||||
wantBody: smallPayload,
|
||||
},
|
||||
|
||||
{
|
||||
name: "return content type",
|
||||
out: smallPayload,
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{}},
|
||||
wantCode: http.StatusOK,
|
||||
wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
|
||||
wantBody: smallPayload,
|
||||
},
|
||||
|
||||
{
|
||||
name: "return status code",
|
||||
statusCode: http.StatusBadRequest,
|
||||
out: smallPayload,
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{}},
|
||||
wantCode: http.StatusBadRequest,
|
||||
wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
|
||||
wantBody: smallPayload,
|
||||
},
|
||||
|
||||
{
|
||||
name: "fail to encode object",
|
||||
out: smallPayload,
|
||||
outErrs: []error{fmt.Errorf("bad")},
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{}},
|
||||
wantCode: http.StatusInternalServerError,
|
||||
wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
|
||||
wantBody: smallPayload,
|
||||
},
|
||||
|
||||
{
|
||||
name: "fail to encode object or status",
|
||||
out: smallPayload,
|
||||
outErrs: []error{fmt.Errorf("bad"), fmt.Errorf("bad2")},
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{}},
|
||||
wantCode: http.StatusInternalServerError,
|
||||
wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
|
||||
wantBody: []byte(": bad"),
|
||||
},
|
||||
|
||||
{
|
||||
name: "fail to encode object or status with status code",
|
||||
out: smallPayload,
|
||||
outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{}},
|
||||
statusCode: http.StatusOK,
|
||||
wantCode: http.StatusNotFound,
|
||||
wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
|
||||
wantBody: []byte("NotFound: \"test\" not found"),
|
||||
},
|
||||
|
||||
{
|
||||
name: "fail to encode object or status with status code and keeps previous error",
|
||||
out: smallPayload,
|
||||
outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{}},
|
||||
statusCode: http.StatusNotAcceptable,
|
||||
wantCode: http.StatusNotAcceptable,
|
||||
wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
|
||||
wantBody: []byte("NotFound: \"test\" not found"),
|
||||
},
|
||||
|
||||
{
|
||||
name: "compression requires feature gate",
|
||||
out: largePayload,
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{
|
||||
"Accept-Encoding": []string{"gzip"},
|
||||
}},
|
||||
wantCode: http.StatusOK,
|
||||
wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
|
||||
wantBody: largePayload,
|
||||
},
|
||||
|
||||
{
|
||||
name: "compress on gzip",
|
||||
compressionEnabled: true,
|
||||
out: largePayload,
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{
|
||||
"Accept-Encoding": []string{"gzip"},
|
||||
}},
|
||||
wantCode: http.StatusOK,
|
||||
wantHeaders: http.Header{
|
||||
"Content-Type": []string{"application/json"},
|
||||
"Content-Encoding": []string{"gzip"},
|
||||
"Vary": []string{"Accept-Encoding"},
|
||||
},
|
||||
wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel),
|
||||
},
|
||||
|
||||
{
|
||||
name: "compression is not performed on small objects",
|
||||
compressionEnabled: true,
|
||||
out: smallPayload,
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{
|
||||
"Accept-Encoding": []string{"gzip"},
|
||||
}},
|
||||
wantCode: http.StatusOK,
|
||||
wantHeaders: http.Header{
|
||||
"Content-Type": []string{"application/json"},
|
||||
},
|
||||
wantBody: smallPayload,
|
||||
},
|
||||
|
||||
{
|
||||
name: "compress when multiple encodings are requested",
|
||||
compressionEnabled: true,
|
||||
out: largePayload,
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{
|
||||
"Accept-Encoding": []string{"deflate, , gzip,"},
|
||||
}},
|
||||
wantCode: http.StatusOK,
|
||||
wantHeaders: http.Header{
|
||||
"Content-Type": []string{"application/json"},
|
||||
"Content-Encoding": []string{"gzip"},
|
||||
"Vary": []string{"Accept-Encoding"},
|
||||
},
|
||||
wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel),
|
||||
},
|
||||
|
||||
{
|
||||
name: "ignore compression on deflate",
|
||||
compressionEnabled: true,
|
||||
out: largePayload,
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{
|
||||
"Accept-Encoding": []string{"deflate"},
|
||||
}},
|
||||
wantCode: http.StatusOK,
|
||||
wantHeaders: http.Header{
|
||||
"Content-Type": []string{"application/json"},
|
||||
},
|
||||
wantBody: largePayload,
|
||||
},
|
||||
|
||||
{
|
||||
name: "ignore compression on unrecognized types",
|
||||
compressionEnabled: true,
|
||||
out: largePayload,
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{
|
||||
"Accept-Encoding": []string{", , other, nothing, what, "},
|
||||
}},
|
||||
wantCode: http.StatusOK,
|
||||
wantHeaders: http.Header{
|
||||
"Content-Type": []string{"application/json"},
|
||||
},
|
||||
wantBody: largePayload,
|
||||
},
|
||||
|
||||
{
|
||||
name: "errors are compressed",
|
||||
compressionEnabled: true,
|
||||
statusCode: http.StatusInternalServerError,
|
||||
out: smallPayload,
|
||||
outErrs: []error{fmt.Errorf(string(largePayload)), fmt.Errorf("bad2")},
|
||||
mediaType: "application/json",
|
||||
req: &http.Request{Header: http.Header{
|
||||
"Accept-Encoding": []string{"gzip"},
|
||||
}},
|
||||
wantCode: http.StatusInternalServerError,
|
||||
wantHeaders: http.Header{
|
||||
"Content-Type": []string{"text/plain"},
|
||||
"Content-Encoding": []string{"gzip"},
|
||||
"Vary": []string{"Accept-Encoding"},
|
||||
},
|
||||
wantBody: gzipContent([]byte(": "+string(largePayload)), defaultGzipContentEncodingLevel),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, tt.compressionEnabled)()
|
||||
|
||||
encoder := &fakeEncoder{
|
||||
buf: tt.out,
|
||||
errs: tt.outErrs,
|
||||
}
|
||||
if tt.statusCode == 0 {
|
||||
tt.statusCode = http.StatusOK
|
||||
}
|
||||
recorder := httptest.NewRecorder()
|
||||
SerializeObject(tt.mediaType, encoder, recorder, tt.req, tt.statusCode, tt.object)
|
||||
result := recorder.Result()
|
||||
if result.StatusCode != tt.wantCode {
|
||||
t.Fatalf("unexpected code: %v", result.StatusCode)
|
||||
}
|
||||
if !reflect.DeepEqual(result.Header, tt.wantHeaders) {
|
||||
t.Fatal(diff.ObjectReflectDiff(tt.wantHeaders, result.Header))
|
||||
}
|
||||
body, _ := ioutil.ReadAll(result.Body)
|
||||
if !bytes.Equal(tt.wantBody, body) {
|
||||
t.Fatalf("wanted:\n%s\ngot:\n%s", hex.Dump(tt.wantBody), hex.Dump(body))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeEncoder struct {
|
||||
obj runtime.Object
|
||||
buf []byte
|
||||
errs []error
|
||||
}
|
||||
|
||||
func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error {
|
||||
e.obj = obj
|
||||
if len(e.errs) > 0 {
|
||||
err := e.errs[0]
|
||||
e.errs = e.errs[1:]
|
||||
return err
|
||||
}
|
||||
_, err := w.Write(e.buf)
|
||||
return err
|
||||
}
|
||||
|
||||
func gzipContent(data []byte, level int) []byte {
|
||||
buf := &bytes.Buffer{}
|
||||
gw, err := gzip.NewWriterLevel(buf, level)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if _, err := gw.Write(data); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := gw.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return buf.Bytes()
|
||||
}
|
@ -40,7 +40,6 @@ import (
|
||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
)
|
||||
|
||||
@ -50,10 +49,9 @@ const (
|
||||
)
|
||||
|
||||
type APIInstaller struct {
|
||||
group *APIGroupVersion
|
||||
prefix string // Path prefix where API resources are to be registered.
|
||||
minRequestTimeout time.Duration
|
||||
enableAPIResponseCompression bool
|
||||
group *APIGroupVersion
|
||||
prefix string // Path prefix where API resources are to be registered.
|
||||
minRequestTimeout time.Duration
|
||||
}
|
||||
|
||||
// Struct capturing information about an action ("GET", "POST", "WATCH", "PROXY", etc).
|
||||
@ -630,9 +628,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
|
||||
}
|
||||
|
||||
if a.enableAPIResponseCompression {
|
||||
handler = genericfilters.RestfulWithCompression(handler)
|
||||
}
|
||||
doc := "read the specified " + kind
|
||||
if isSubresource {
|
||||
doc = "read " + subresource + " of the specified " + kind
|
||||
@ -662,9 +657,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
doc = "list " + subresource + " of objects of kind " + kind
|
||||
}
|
||||
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
|
||||
if a.enableAPIResponseCompression {
|
||||
handler = genericfilters.RestfulWithCompression(handler)
|
||||
}
|
||||
route := ws.GET(action.Path).To(handler).
|
||||
Doc(doc).
|
||||
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
|
||||
|
@ -95,7 +95,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
||||
@ -103,7 +102,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/openapi:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
|
@ -54,13 +54,11 @@ import (
|
||||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic"
|
||||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/routes"
|
||||
serverstore "k8s.io/apiserver/pkg/server/storage"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
@ -181,10 +179,6 @@ type Config struct {
|
||||
// Predicate which is true for paths of long-running http requests
|
||||
LongRunningFunc apirequest.LongRunningRequestCheck
|
||||
|
||||
// EnableAPIResponseCompression indicates whether API Responses should support compression
|
||||
// if the client requests it via Accept-Encoding
|
||||
EnableAPIResponseCompression bool
|
||||
|
||||
// MergedResourceConfig indicates which groupVersion enabled and its resources enabled/disabled.
|
||||
// This is composed of genericapiserver defaultAPIResourceConfig and those parsed from flags.
|
||||
// If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig.
|
||||
@ -298,8 +292,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
||||
// proto when persisted in etcd. Assuming the upper bound of
|
||||
// the size ratio is 10:1, we set 100MB as the largest request
|
||||
// body size to be accepted and decoded in a write request.
|
||||
MaxRequestBodyBytes: int64(100 * 1024 * 1024),
|
||||
EnableAPIResponseCompression: utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression),
|
||||
MaxRequestBodyBytes: int64(100 * 1024 * 1024),
|
||||
|
||||
// Default to treating watch as a long-running operation
|
||||
// Generic API servers have no inherent long-running subresources
|
||||
@ -511,9 +504,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
||||
|
||||
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
|
||||
|
||||
enableAPIResponseCompression: c.EnableAPIResponseCompression,
|
||||
maxRequestBodyBytes: c.MaxRequestBodyBytes,
|
||||
healthzClock: clock.RealClock{},
|
||||
maxRequestBodyBytes: c.MaxRequestBodyBytes,
|
||||
healthzClock: clock.RealClock{},
|
||||
}
|
||||
|
||||
for {
|
||||
|
@ -9,7 +9,6 @@ load(
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"compression_test.go",
|
||||
"content_type_test.go",
|
||||
"cors_test.go",
|
||||
"maxinflight_test.go",
|
||||
@ -32,7 +31,6 @@ go_test(
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"compression.go",
|
||||
"content_type.go",
|
||||
"cors.go",
|
||||
"doc.go",
|
||||
@ -55,7 +53,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
|
||||
"//vendor/github.com/emicklei/go-restful:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -1,181 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package filters
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"compress/zlib"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
// Compressor is an interface to compression writers
|
||||
type Compressor interface {
|
||||
io.WriteCloser
|
||||
Flush() error
|
||||
}
|
||||
|
||||
const (
|
||||
headerAcceptEncoding = "Accept-Encoding"
|
||||
headerContentEncoding = "Content-Encoding"
|
||||
|
||||
encodingGzip = "gzip"
|
||||
encodingDeflate = "deflate"
|
||||
)
|
||||
|
||||
// WithCompression wraps an http.Handler with the Compression Handler
|
||||
func WithCompression(handler http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
wantsCompression, encoding := wantsCompressedResponse(req)
|
||||
w.Header().Set("Vary", "Accept-Encoding")
|
||||
if wantsCompression {
|
||||
compressionWriter, err := NewCompressionResponseWriter(w, encoding)
|
||||
if err != nil {
|
||||
handleError(w, req, err)
|
||||
runtime.HandleError(fmt.Errorf("failed to compress HTTP response: %v", err))
|
||||
return
|
||||
}
|
||||
compressionWriter.Header().Set("Content-Encoding", encoding)
|
||||
handler.ServeHTTP(compressionWriter, req)
|
||||
compressionWriter.(*compressionResponseWriter).Close()
|
||||
} else {
|
||||
handler.ServeHTTP(w, req)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// wantsCompressedResponse reads the Accept-Encoding header to see if and which encoding is requested.
|
||||
func wantsCompressedResponse(req *http.Request) (bool, string) {
|
||||
// don't compress watches
|
||||
ctx := req.Context()
|
||||
info, ok := request.RequestInfoFrom(ctx)
|
||||
if !ok {
|
||||
return false, ""
|
||||
}
|
||||
if !info.IsResourceRequest {
|
||||
return false, ""
|
||||
}
|
||||
if info.Verb == "watch" {
|
||||
return false, ""
|
||||
}
|
||||
header := req.Header.Get(headerAcceptEncoding)
|
||||
gi := strings.Index(header, encodingGzip)
|
||||
zi := strings.Index(header, encodingDeflate)
|
||||
// use in order of appearance
|
||||
switch {
|
||||
case gi == -1:
|
||||
return zi != -1, encodingDeflate
|
||||
case zi == -1:
|
||||
return gi != -1, encodingGzip
|
||||
case gi < zi:
|
||||
return true, encodingGzip
|
||||
default:
|
||||
return true, encodingDeflate
|
||||
}
|
||||
}
|
||||
|
||||
type compressionResponseWriter struct {
|
||||
writer http.ResponseWriter
|
||||
compressor Compressor
|
||||
encoding string
|
||||
}
|
||||
|
||||
// NewCompressionResponseWriter returns wraps w with a compression ResponseWriter, using the given encoding
|
||||
func NewCompressionResponseWriter(w http.ResponseWriter, encoding string) (http.ResponseWriter, error) {
|
||||
var compressor Compressor
|
||||
switch encoding {
|
||||
case encodingGzip:
|
||||
compressor = gzip.NewWriter(w)
|
||||
case encodingDeflate:
|
||||
compressor = zlib.NewWriter(w)
|
||||
default:
|
||||
return nil, fmt.Errorf("%s is not a supported encoding type", encoding)
|
||||
}
|
||||
return &compressionResponseWriter{
|
||||
writer: w,
|
||||
compressor: compressor,
|
||||
encoding: encoding,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// compressionResponseWriter implements http.Responsewriter Interface
|
||||
var _ http.ResponseWriter = &compressionResponseWriter{}
|
||||
|
||||
func (c *compressionResponseWriter) Header() http.Header {
|
||||
return c.writer.Header()
|
||||
}
|
||||
|
||||
// compress data according to compression method
|
||||
func (c *compressionResponseWriter) Write(p []byte) (int, error) {
|
||||
if c.compressorClosed() {
|
||||
return -1, errors.New("compressing error: tried to write data using closed compressor")
|
||||
}
|
||||
c.Header().Set(headerContentEncoding, c.encoding)
|
||||
defer c.compressor.Flush()
|
||||
return c.compressor.Write(p)
|
||||
}
|
||||
|
||||
func (c *compressionResponseWriter) WriteHeader(status int) {
|
||||
c.writer.WriteHeader(status)
|
||||
}
|
||||
|
||||
// CloseNotify is part of http.CloseNotifier interface
|
||||
func (c *compressionResponseWriter) CloseNotify() <-chan bool {
|
||||
return c.writer.(http.CloseNotifier).CloseNotify()
|
||||
}
|
||||
|
||||
// Close the underlying compressor
|
||||
func (c *compressionResponseWriter) Close() error {
|
||||
if c.compressorClosed() {
|
||||
return errors.New("Compressing error: tried to close already closed compressor")
|
||||
}
|
||||
|
||||
c.compressor.Close()
|
||||
c.compressor = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *compressionResponseWriter) Flush() {
|
||||
if c.compressorClosed() {
|
||||
return
|
||||
}
|
||||
c.compressor.Flush()
|
||||
}
|
||||
|
||||
func (c *compressionResponseWriter) compressorClosed() bool {
|
||||
return nil == c.compressor
|
||||
}
|
||||
|
||||
// RestfulWithCompression wraps WithCompression to be compatible with go-restful
|
||||
func RestfulWithCompression(function restful.RouteFunction) restful.RouteFunction {
|
||||
return restful.RouteFunction(func(request *restful.Request, response *restful.Response) {
|
||||
handler := WithCompression(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
response.ResponseWriter = w
|
||||
request.Request = req
|
||||
function(request, response)
|
||||
}))
|
||||
handler.ServeHTTP(response.ResponseWriter, request.Request)
|
||||
})
|
||||
}
|
@ -1,106 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package filters
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/endpoints/filters"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
func TestCompression(t *testing.T) {
|
||||
tests := []struct {
|
||||
encoding string
|
||||
watch bool
|
||||
}{
|
||||
{"", false},
|
||||
{"gzip", true},
|
||||
{"gzip", false},
|
||||
}
|
||||
|
||||
responseData := []byte("1234")
|
||||
|
||||
for _, test := range tests {
|
||||
handler := WithCompression(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
w.Write(responseData)
|
||||
}),
|
||||
)
|
||||
handler = filters.WithRequestInfo(handler, newTestRequestInfoResolver())
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
client := http.Client{
|
||||
Transport: &http.Transport{
|
||||
DisableCompression: true,
|
||||
},
|
||||
}
|
||||
|
||||
url := server.URL + "/api/v1/pods"
|
||||
if test.watch {
|
||||
url = url + "?watch=1"
|
||||
}
|
||||
request, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
request.Header.Set("Accept-Encoding", test.encoding)
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
var reader io.Reader
|
||||
if test.encoding == "gzip" && !test.watch {
|
||||
if response.Header.Get("Content-Encoding") != "gzip" {
|
||||
t.Fatal("expected response header Content-Encoding to be set to \"gzip\"")
|
||||
}
|
||||
if response.Header.Get("Vary") != "Accept-Encoding" {
|
||||
t.Fatal("expected response header Vary to be set to \"Accept-Encoding\"")
|
||||
}
|
||||
reader, err = gzip.NewReader(response.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
} else {
|
||||
if response.Header.Get("Content-Encoding") == "gzip" {
|
||||
t.Fatal("expected response header Content-Encoding not to be set")
|
||||
}
|
||||
reader = response.Body
|
||||
}
|
||||
body, err := ioutil.ReadAll(reader)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if !bytes.Equal(body, responseData) {
|
||||
t.Fatalf("Expected response body %s to equal %s", body, responseData)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newTestRequestInfoResolver() *request.RequestInfoFactory {
|
||||
return &request.RequestInfoFactory{
|
||||
APIPrefixes: sets.NewString("api", "apis"),
|
||||
GrouplessAPIPrefixes: sets.NewString("api"),
|
||||
}
|
||||
}
|
@ -485,10 +485,9 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
|
||||
|
||||
EquivalentResourceRegistry: s.EquivalentResourceRegistry,
|
||||
|
||||
Admit: s.admissionControl,
|
||||
MinRequestTimeout: s.minRequestTimeout,
|
||||
EnableAPIResponseCompression: s.enableAPIResponseCompression,
|
||||
Authorizer: s.Authorizer,
|
||||
Admit: s.admissionControl,
|
||||
MinRequestTimeout: s.minRequestTimeout,
|
||||
Authorizer: s.Authorizer,
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user