From fc650a54d02f358c7fc65fa25b8312028bd4e944 Mon Sep 17 00:00:00 2001 From: Scott Weiss Date: Tue, 30 May 2017 11:41:14 -0400 Subject: [PATCH] add gzip compression to GET and LIST requests closes #44164 --- .../src/k8s.io/apiserver/pkg/endpoints/BUILD | 1 + .../apiserver/pkg/endpoints/apiserver_test.go | 168 ++++++++++++++++ .../apiserver/pkg/endpoints/installer.go | 3 + .../k8s.io/apiserver/pkg/server/filters/BUILD | 3 + .../pkg/server/filters/compression.go | 183 ++++++++++++++++++ .../pkg/server/filters/compression_test.go | 110 +++++++++++ 6 files changed, 468 insertions(+) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/compression.go create mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/compression_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD index 53581c3419c..14f05156382 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD @@ -84,5 +84,6 @@ go_library( "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", + "//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 4d6d58970bf..27614619641 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -18,6 +18,7 @@ package endpoints import ( "bytes" + "compress/gzip" "encoding/json" "errors" "fmt" @@ -1203,6 +1204,102 @@ func TestRequestsWithInvalidQuery(t *testing.T) { } } +func TestListCompresion(t *testing.T) { + testCases := []struct { + url string + namespace string + selfLink string + legacy bool + label string + field string + acceptEncoding string + }{ + // list items in a namespace in the path + { + url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple", + namespace: "default", + selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple", + acceptEncoding: "", + }, + { + url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple", + namespace: "default", + selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple", + acceptEncoding: "gzip", + }, + } + for i, testCase := range testCases { + storage := map[string]rest.Storage{} + simpleStorage := SimpleRESTStorage{expectedResourceNamespace: testCase.namespace} + storage["simple"] = &simpleStorage + selfLinker := &setTestSelfLinker{ + t: t, + namespace: testCase.namespace, + expectedSet: testCase.selfLink, + } + var handler = handleInternal(storage, admissionControl, selfLinker, nil) + server := httptest.NewServer(handler) + defer server.Close() + + req, err := http.NewRequest("GET", server.URL+testCase.url, nil) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + // It's necessary to manually set Accept-Encoding here + // to prevent http.DefaultClient from automatically + // decoding responses + req.Header.Set("Accept-Encoding", testCase.acceptEncoding) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Errorf("%d: unexpected status: %d from url %s, Expected: %d, %#v", i, resp.StatusCode, testCase.url, http.StatusOK, resp) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + t.Logf("%d: body: %s", i, string(body)) + continue + } + // TODO: future, restore get links + if !selfLinker.called { + t.Errorf("%d: never set self link", i) + } + if !simpleStorage.namespacePresent { + t.Errorf("%d: namespace not set", i) + } else if simpleStorage.actualNamespace != testCase.namespace { + t.Errorf("%d: %q unexpected resource namespace: %s", i, testCase.url, simpleStorage.actualNamespace) + } + if simpleStorage.requestedLabelSelector == nil || simpleStorage.requestedLabelSelector.String() != testCase.label { + t.Errorf("%d: unexpected label selector: %v", i, simpleStorage.requestedLabelSelector) + } + if simpleStorage.requestedFieldSelector == nil || simpleStorage.requestedFieldSelector.String() != testCase.field { + t.Errorf("%d: unexpected field selector: %v", i, simpleStorage.requestedFieldSelector) + } + + var decoder *json.Decoder + if testCase.acceptEncoding == "gzip" { + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + t.Fatalf("unexpected error creating gzip reader: %v", err) + } + decoder = json.NewDecoder(gzipReader) + } else { + decoder = json.NewDecoder(resp.Body) + } + var itemOut genericapitesting.SimpleList + err = decoder.Decode(&itemOut) + if err != nil { + t.Errorf("failed to read response body as SimpleList: %v", err) + } + } +} + func TestLogs(t *testing.T) { handler := handle(map[string]rest.Storage{}) server := httptest.NewServer(handler) @@ -1518,6 +1615,77 @@ func TestGet(t *testing.T) { } } +func TestGetCompression(t *testing.T) { + storage := map[string]rest.Storage{} + simpleStorage := SimpleRESTStorage{ + item: genericapitesting.Simple{ + Other: "foo", + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id", + name: "id", + namespace: "default", + } + + storage["simple"] = &simpleStorage + handler := handleLinker(storage, selfLinker) + server := httptest.NewServer(handler) + defer server.Close() + + tests := []struct { + acceptEncoding string + }{ + {acceptEncoding: ""}, + {acceptEncoding: "gzip"}, + } + + for _, test := range tests { + req, err := http.NewRequest("GET", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/id", nil) + if err != nil { + t.Fatalf("unexpected error cretaing request: %v", err) + } + // It's necessary to manually set Accept-Encoding here + // to prevent http.DefaultClient from automatically + // decoding responses + req.Header.Set("Accept-Encoding", test.acceptEncoding) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var decoder *json.Decoder + if test.acceptEncoding == "gzip" { + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + t.Fatalf("unexpected error creating gzip reader: %v", err) + } + decoder = json.NewDecoder(gzipReader) + } else { + decoder = json.NewDecoder(resp.Body) + } + var itemOut genericapitesting.Simple + err = decoder.Decode(&itemOut) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("unexpected error reading body: %v", err) + } + + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !selfLinker.called { + t.Errorf("Never set self link") + } + } +} + func TestGetUninitialized(t *testing.T) { storage := map[string]rest.Storage{} simpleStorage := SimpleRESTStorage{ diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go index 5a39a6ae1cf..4c46e07fc17 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go @@ -40,6 +40,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" + genericfilters "k8s.io/apiserver/pkg/server/filters" ) const ( @@ -587,6 +588,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag handler = restfulGetResource(getter, exporter, reqScope) } handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler) + handler = genericfilters.RestfulWithCompression(handler, a.group.Context) doc := "read the specified " + kind if hasSubresource { doc = "read " + subresource + " of the specified " + kind @@ -616,6 +618,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag doc = "list " + subresource + " of objects of kind " + kind } handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) + handler = genericfilters.RestfulWithCompression(handler, a.group.Context) route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index 77aa7d00763..6efce820f42 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -11,6 +11,7 @@ load( go_test( name = "go_default_test", srcs = [ + "compression_test.go", "cors_test.go", "maxinflight_test.go", "timeout_test.go", @@ -31,6 +32,7 @@ go_test( go_library( name = "go_default_library", srcs = [ + "compression.go", "cors.go", "doc.go", "longrunning.go", @@ -40,6 +42,7 @@ go_library( ], tags = ["automanaged"], deps = [ + "//vendor/github.com/emicklei/go-restful:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/compression.go b/staging/src/k8s.io/apiserver/pkg/server/filters/compression.go new file mode 100644 index 00000000000..6303ab54a5f --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/compression.go @@ -0,0 +1,183 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "compress/gzip" + "compress/zlib" + "errors" + "fmt" + "io" + "net/http" + "strings" + + "github.com/emicklei/go-restful" + + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/endpoints/request" +) + +// Compressor is an interface to compression writers +type Compressor interface { + io.WriteCloser + Flush() error +} + +const ( + headerAcceptEncoding = "Accept-Encoding" + headerContentEncoding = "Content-Encoding" + + encodingGzip = "gzip" + encodingDeflate = "deflate" +) + +// WithCompression wraps an http.Handler with the Compression Handler +func WithCompression(handler http.Handler, ctxMapper request.RequestContextMapper) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + wantsCompression, encoding := wantsCompressedResponse(req, ctxMapper) + w.Header().Set("Vary", "Accept-Encoding") + if wantsCompression { + compressionWriter, err := NewCompressionResponseWriter(w, encoding) + if err != nil { + handleError(w, req, err) + runtime.HandleError(fmt.Errorf("failed to compress HTTP response: %v", err)) + return + } + compressionWriter.Header().Set("Content-Encoding", encoding) + handler.ServeHTTP(compressionWriter, req) + compressionWriter.(*compressionResponseWriter).Close() + } else { + handler.ServeHTTP(w, req) + } + }) +} + +// wantsCompressedResponse reads the Accept-Encoding header to see if and which encoding is requested. +func wantsCompressedResponse(req *http.Request, ctxMapper request.RequestContextMapper) (bool, string) { + // don't compress watches + ctx, ok := ctxMapper.Get(req) + if !ok { + return false, "" + } + info, ok := request.RequestInfoFrom(ctx) + if !ok { + return false, "" + } + if !info.IsResourceRequest { + return false, "" + } + if info.Verb == "watch" { + return false, "" + } + header := req.Header.Get(headerAcceptEncoding) + gi := strings.Index(header, encodingGzip) + zi := strings.Index(header, encodingDeflate) + // use in order of appearance + switch { + case gi == -1: + return zi != -1, encodingDeflate + case zi == -1: + return gi != -1, encodingGzip + case gi < zi: + return true, encodingGzip + default: + return true, encodingDeflate + } +} + +type compressionResponseWriter struct { + writer http.ResponseWriter + compressor Compressor + encoding string +} + +// NewCompressionResponseWriter returns wraps w with a compression ResponseWriter, using the given encoding +func NewCompressionResponseWriter(w http.ResponseWriter, encoding string) (http.ResponseWriter, error) { + var compressor Compressor + switch encoding { + case encodingGzip: + compressor = gzip.NewWriter(w) + case encodingDeflate: + compressor = zlib.NewWriter(w) + default: + return nil, fmt.Errorf("%s is not a supported encoding type", encoding) + } + return &compressionResponseWriter{ + writer: w, + compressor: compressor, + encoding: encoding, + }, nil +} + +// compressionResponseWriter implements http.Responsewriter Interface +var _ http.ResponseWriter = &compressionResponseWriter{} + +func (c *compressionResponseWriter) Header() http.Header { + return c.writer.Header() +} + +// compress data according to compression method +func (c *compressionResponseWriter) Write(p []byte) (int, error) { + if c.compressorClosed() { + return -1, errors.New("compressing error: tried to write data using closed compressor") + } + c.Header().Set(headerContentEncoding, c.encoding) + return c.compressor.Write(p) +} + +func (c *compressionResponseWriter) WriteHeader(status int) { + c.writer.WriteHeader(status) +} + +// CloseNotify is part of http.CloseNotifier interface +func (c *compressionResponseWriter) CloseNotify() <-chan bool { + return c.writer.(http.CloseNotifier).CloseNotify() +} + +// Close the underlying compressor +func (c *compressionResponseWriter) Close() error { + if c.compressorClosed() { + return errors.New("Compressing error: tried to close already closed compressor") + } + + c.compressor.Close() + c.compressor = nil + return nil +} + +func (c *compressionResponseWriter) Flush() { + if c.compressorClosed() { + return + } + c.compressor.Flush() +} + +func (c *compressionResponseWriter) compressorClosed() bool { + return nil == c.compressor +} + +// RestfulWithCompression wraps WithCompression to be compatible with go-restful +func RestfulWithCompression(function restful.RouteFunction, ctxMapper request.RequestContextMapper) restful.RouteFunction { + return restful.RouteFunction(func(request *restful.Request, response *restful.Response) { + handler := WithCompression(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + response.ResponseWriter = w + request.Request = req + function(request, response) + }), ctxMapper) + handler.ServeHTTP(response.ResponseWriter, request.Request) + }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/compression_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/compression_test.go new file mode 100644 index 00000000000..931c9051542 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/compression_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "bytes" + "compress/gzip" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/filters" + "k8s.io/apiserver/pkg/endpoints/request" +) + +func TestCompression(t *testing.T) { + tests := []struct { + encoding string + watch bool + }{ + {"", false}, + {"gzip", true}, + {"gzip", false}, + } + + responseData := []byte("1234") + + requestContextMapper := request.NewRequestContextMapper() + + for _, test := range tests { + handler := WithCompression( + http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write(responseData) + }), + requestContextMapper, + ) + handler = filters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper) + handler = request.WithRequestContext(handler, requestContextMapper) + server := httptest.NewServer(handler) + defer server.Close() + client := http.Client{ + Transport: &http.Transport{ + DisableCompression: true, + }, + } + + url := server.URL + "/api/v1/pods" + if test.watch { + url = url + "?watch=1" + } + request, err := http.NewRequest("GET", url, nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + request.Header.Set("Accept-Encoding", test.encoding) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + var reader io.Reader + if test.encoding == "gzip" && !test.watch { + if response.Header.Get("Content-Encoding") != "gzip" { + t.Fatal("expected response header Content-Encoding to be set to \"gzip\"") + } + if response.Header.Get("Vary") != "Accept-Encoding" { + t.Fatal("expected response header Vary to be set to \"Accept-Encoding\"") + } + reader, err = gzip.NewReader(response.Body) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } else { + if response.Header.Get("Content-Encoding") == "gzip" { + t.Fatal("expected response header Content-Encoding not to be set") + } + reader = response.Body + } + body, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal("unexpected error: %v", err) + } + if !bytes.Equal(body, responseData) { + t.Fatalf("Expected response body %s to equal %s", body, responseData) + } + } +} + +func newTestRequestInfoResolver() *request.RequestInfoFactory { + return &request.RequestInfoFactory{ + APIPrefixes: sets.NewString("api", "apis"), + GrouplessAPIPrefixes: sets.NewString("api"), + } +}