mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Merge pull request #46966 from ilackarms/compression-gating
Automatic merge from submit-queue (batch tested with PRs 47883, 47179, 46966, 47982, 47945) Add feature gating to REST Compression **What this PR does / why we need it**: Adds feature gating to opt out of REST API compression **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #46963 **Special notes for your reviewer**: This PR is a fix / addendum to #45666 **Release note**: ```release-note ```
This commit is contained in:
commit
80af10c0e6
@ -55,6 +55,7 @@ go_test(
|
|||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/testing:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/endpoints/testing:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -84,5 +85,6 @@ go_library(
|
|||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -18,6 +18,7 @@ package endpoints
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -66,6 +67,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
|
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
|
"k8s.io/apiserver/pkg/server/filters"
|
||||||
)
|
)
|
||||||
|
|
||||||
// alwaysAdmit is an implementation of admission.Interface which always says yes to an admit request.
|
// alwaysAdmit is an implementation of admission.Interface which always says yes to an admit request.
|
||||||
@ -1207,6 +1209,110 @@ func TestRequestsWithInvalidQuery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListCompression(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
url string
|
||||||
|
namespace string
|
||||||
|
selfLink string
|
||||||
|
legacy bool
|
||||||
|
label string
|
||||||
|
field string
|
||||||
|
acceptEncoding string
|
||||||
|
}{
|
||||||
|
// list items in a namespace in the path
|
||||||
|
{
|
||||||
|
url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
|
||||||
|
namespace: "default",
|
||||||
|
selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
|
||||||
|
acceptEncoding: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
|
||||||
|
namespace: "default",
|
||||||
|
selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
|
||||||
|
acceptEncoding: "gzip",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, testCase := range testCases {
|
||||||
|
storage := map[string]rest.Storage{}
|
||||||
|
simpleStorage := SimpleRESTStorage{expectedResourceNamespace: testCase.namespace}
|
||||||
|
storage["simple"] = &simpleStorage
|
||||||
|
selfLinker := &setTestSelfLinker{
|
||||||
|
t: t,
|
||||||
|
namespace: testCase.namespace,
|
||||||
|
expectedSet: testCase.selfLink,
|
||||||
|
}
|
||||||
|
var handler = handleInternal(storage, admissionControl, selfLinker, nil)
|
||||||
|
|
||||||
|
requestContextMapper = request.NewRequestContextMapper()
|
||||||
|
|
||||||
|
handler = filters.WithCompression(handler, requestContextMapper)
|
||||||
|
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
|
||||||
|
handler = request.WithRequestContext(handler, requestContextMapper)
|
||||||
|
|
||||||
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", server.URL+testCase.url, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("%d: unexpected error: %v", i, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// It's necessary to manually set Accept-Encoding here
|
||||||
|
// to prevent http.DefaultClient from automatically
|
||||||
|
// decoding responses
|
||||||
|
req.Header.Set("Accept-Encoding", testCase.acceptEncoding)
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("%d: unexpected error: %v", i, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
t.Errorf("%d: unexpected status: %d from url %s, Expected: %d, %#v", i, resp.StatusCode, testCase.url, http.StatusOK, resp)
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("%d: unexpected error: %v", i, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.Logf("%d: body: %s", i, string(body))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// TODO: future, restore get links
|
||||||
|
if !selfLinker.called {
|
||||||
|
t.Errorf("%d: never set self link", i)
|
||||||
|
}
|
||||||
|
if !simpleStorage.namespacePresent {
|
||||||
|
t.Errorf("%d: namespace not set", i)
|
||||||
|
} else if simpleStorage.actualNamespace != testCase.namespace {
|
||||||
|
t.Errorf("%d: %q unexpected resource namespace: %s", i, testCase.url, simpleStorage.actualNamespace)
|
||||||
|
}
|
||||||
|
if simpleStorage.requestedLabelSelector == nil || simpleStorage.requestedLabelSelector.String() != testCase.label {
|
||||||
|
t.Errorf("%d: unexpected label selector: %v", i, simpleStorage.requestedLabelSelector)
|
||||||
|
}
|
||||||
|
if simpleStorage.requestedFieldSelector == nil || simpleStorage.requestedFieldSelector.String() != testCase.field {
|
||||||
|
t.Errorf("%d: unexpected field selector: %v", i, simpleStorage.requestedFieldSelector)
|
||||||
|
}
|
||||||
|
|
||||||
|
var decoder *json.Decoder
|
||||||
|
if testCase.acceptEncoding == "gzip" {
|
||||||
|
gzipReader, err := gzip.NewReader(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating gzip reader: %v", err)
|
||||||
|
}
|
||||||
|
decoder = json.NewDecoder(gzipReader)
|
||||||
|
} else {
|
||||||
|
decoder = json.NewDecoder(resp.Body)
|
||||||
|
}
|
||||||
|
var itemOut genericapitesting.SimpleList
|
||||||
|
err = decoder.Decode(&itemOut)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to read response body as SimpleList: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestLogs(t *testing.T) {
|
func TestLogs(t *testing.T) {
|
||||||
handler := handle(map[string]rest.Storage{})
|
handler := handle(map[string]rest.Storage{})
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
@ -1522,6 +1628,82 @@ func TestGet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetCompression(t *testing.T) {
|
||||||
|
storage := map[string]rest.Storage{}
|
||||||
|
simpleStorage := SimpleRESTStorage{
|
||||||
|
item: genericapitesting.Simple{
|
||||||
|
Other: "foo",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
selfLinker := &setTestSelfLinker{
|
||||||
|
t: t,
|
||||||
|
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
|
||||||
|
name: "id",
|
||||||
|
namespace: "default",
|
||||||
|
}
|
||||||
|
|
||||||
|
requestContextMapper = request.NewRequestContextMapper()
|
||||||
|
|
||||||
|
storage["simple"] = &simpleStorage
|
||||||
|
handler := handleLinker(storage, selfLinker)
|
||||||
|
handler = filters.WithCompression(handler, requestContextMapper)
|
||||||
|
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
|
||||||
|
handler = request.WithRequestContext(handler, requestContextMapper)
|
||||||
|
server := httptest.NewServer(handler)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
acceptEncoding string
|
||||||
|
}{
|
||||||
|
{acceptEncoding: ""},
|
||||||
|
{acceptEncoding: "gzip"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
req, err := http.NewRequest("GET", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/id", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error cretaing request: %v", err)
|
||||||
|
}
|
||||||
|
// It's necessary to manually set Accept-Encoding here
|
||||||
|
// to prevent http.DefaultClient from automatically
|
||||||
|
// decoding responses
|
||||||
|
req.Header.Set("Accept-Encoding", test.acceptEncoding)
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected response: %#v", resp)
|
||||||
|
}
|
||||||
|
var decoder *json.Decoder
|
||||||
|
if test.acceptEncoding == "gzip" {
|
||||||
|
gzipReader, err := gzip.NewReader(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating gzip reader: %v", err)
|
||||||
|
}
|
||||||
|
decoder = json.NewDecoder(gzipReader)
|
||||||
|
} else {
|
||||||
|
decoder = json.NewDecoder(resp.Body)
|
||||||
|
}
|
||||||
|
var itemOut genericapitesting.Simple
|
||||||
|
err = decoder.Decode(&itemOut)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error reading body: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if itemOut.Name != simpleStorage.item.Name {
|
||||||
|
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
|
||||||
|
}
|
||||||
|
if !selfLinker.called {
|
||||||
|
t.Errorf("Never set self link")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetUninitialized(t *testing.T) {
|
func TestGetUninitialized(t *testing.T) {
|
||||||
storage := map[string]rest.Storage{}
|
storage := map[string]rest.Storage{}
|
||||||
simpleStorage := SimpleRESTStorage{
|
simpleStorage := SimpleRESTStorage{
|
||||||
|
@ -87,6 +87,10 @@ type APIGroupVersion struct {
|
|||||||
// ResourceLister is an interface that knows how to list resources
|
// ResourceLister is an interface that knows how to list resources
|
||||||
// for this API Group.
|
// for this API Group.
|
||||||
ResourceLister discovery.APIResourceLister
|
ResourceLister discovery.APIResourceLister
|
||||||
|
|
||||||
|
// EnableAPIResponseCompression indicates whether API Responses should support compression
|
||||||
|
// if the client requests it via Accept-Encoding
|
||||||
|
EnableAPIResponseCompression bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
|
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
|
||||||
@ -138,9 +142,10 @@ func (g *APIGroupVersion) UpdateREST(container *restful.Container) error {
|
|||||||
func (g *APIGroupVersion) newInstaller() *APIInstaller {
|
func (g *APIGroupVersion) newInstaller() *APIInstaller {
|
||||||
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
|
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
|
||||||
installer := &APIInstaller{
|
installer := &APIInstaller{
|
||||||
group: g,
|
group: g,
|
||||||
prefix: prefix,
|
prefix: prefix,
|
||||||
minRequestTimeout: g.MinRequestTimeout,
|
minRequestTimeout: g.MinRequestTimeout,
|
||||||
|
enableAPIResponseCompression: g.EnableAPIResponseCompression,
|
||||||
}
|
}
|
||||||
return installer
|
return installer
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
|
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -48,9 +49,10 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type APIInstaller struct {
|
type APIInstaller struct {
|
||||||
group *APIGroupVersion
|
group *APIGroupVersion
|
||||||
prefix string // Path prefix where API resources are to be registered.
|
prefix string // Path prefix where API resources are to be registered.
|
||||||
minRequestTimeout time.Duration
|
minRequestTimeout time.Duration
|
||||||
|
enableAPIResponseCompression bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Struct capturing information about an action ("GET", "POST", "WATCH", "PROXY", etc).
|
// Struct capturing information about an action ("GET", "POST", "WATCH", "PROXY", etc).
|
||||||
@ -584,6 +586,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
|||||||
handler = restfulGetResource(getter, exporter, reqScope)
|
handler = restfulGetResource(getter, exporter, reqScope)
|
||||||
}
|
}
|
||||||
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler)
|
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler)
|
||||||
|
if a.enableAPIResponseCompression {
|
||||||
|
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
|
||||||
|
}
|
||||||
doc := "read the specified " + kind
|
doc := "read the specified " + kind
|
||||||
if hasSubresource {
|
if hasSubresource {
|
||||||
doc = "read " + subresource + " of the specified " + kind
|
doc = "read " + subresource + " of the specified " + kind
|
||||||
@ -613,6 +618,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
|||||||
doc = "list " + subresource + " of objects of kind " + kind
|
doc = "list " + subresource + " of objects of kind " + kind
|
||||||
}
|
}
|
||||||
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
|
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
|
||||||
|
if a.enableAPIResponseCompression {
|
||||||
|
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
|
||||||
|
}
|
||||||
route := ws.GET(action.Path).To(handler).
|
route := ws.GET(action.Path).To(handler).
|
||||||
Doc(doc).
|
Doc(doc).
|
||||||
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
|
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
|
||||||
|
@ -41,6 +41,12 @@ const (
|
|||||||
// pluggable output backends and an audit policy specifying how different requests should be
|
// pluggable output backends and an audit policy specifying how different requests should be
|
||||||
// audited.
|
// audited.
|
||||||
AdvancedAuditing utilfeature.Feature = "AdvancedAuditing"
|
AdvancedAuditing utilfeature.Feature = "AdvancedAuditing"
|
||||||
|
|
||||||
|
// owner: @ilackams
|
||||||
|
// alpha: v1.7
|
||||||
|
//
|
||||||
|
// Enables compression of REST responses (GET and LIST only)
|
||||||
|
APIResponseCompression utilfeature.Feature = "APIResponseCompression"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -53,4 +59,5 @@ func init() {
|
|||||||
var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
|
var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
|
||||||
StreamingProxyRedirects: {Default: true, PreRelease: utilfeature.Beta},
|
StreamingProxyRedirects: {Default: true, PreRelease: utilfeature.Beta},
|
||||||
AdvancedAuditing: {Default: false, PreRelease: utilfeature.Alpha},
|
AdvancedAuditing: {Default: false, PreRelease: utilfeature.Alpha},
|
||||||
|
APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha},
|
||||||
}
|
}
|
||||||
|
@ -160,6 +160,10 @@ type Config struct {
|
|||||||
// Predicate which is true for paths of long-running http requests
|
// Predicate which is true for paths of long-running http requests
|
||||||
LongRunningFunc apirequest.LongRunningRequestCheck
|
LongRunningFunc apirequest.LongRunningRequestCheck
|
||||||
|
|
||||||
|
// EnableAPIResponseCompression indicates whether API Responses should support compression
|
||||||
|
// if the client requests it via Accept-Encoding
|
||||||
|
EnableAPIResponseCompression bool
|
||||||
|
|
||||||
//===========================================================================
|
//===========================================================================
|
||||||
// values below here are targets for removal
|
// values below here are targets for removal
|
||||||
//===========================================================================
|
//===========================================================================
|
||||||
@ -206,19 +210,20 @@ type SecureServingInfo struct {
|
|||||||
// NewConfig returns a Config struct with the default values
|
// NewConfig returns a Config struct with the default values
|
||||||
func NewConfig(codecs serializer.CodecFactory) *Config {
|
func NewConfig(codecs serializer.CodecFactory) *Config {
|
||||||
return &Config{
|
return &Config{
|
||||||
Serializer: codecs,
|
Serializer: codecs,
|
||||||
ReadWritePort: 443,
|
ReadWritePort: 443,
|
||||||
RequestContextMapper: apirequest.NewRequestContextMapper(),
|
RequestContextMapper: apirequest.NewRequestContextMapper(),
|
||||||
BuildHandlerChainFunc: DefaultBuildHandlerChain,
|
BuildHandlerChainFunc: DefaultBuildHandlerChain,
|
||||||
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
|
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
|
||||||
DisabledPostStartHooks: sets.NewString(),
|
DisabledPostStartHooks: sets.NewString(),
|
||||||
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
|
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
|
||||||
EnableIndex: true,
|
EnableIndex: true,
|
||||||
EnableDiscovery: true,
|
EnableDiscovery: true,
|
||||||
EnableProfiling: true,
|
EnableProfiling: true,
|
||||||
MaxRequestsInFlight: 400,
|
MaxRequestsInFlight: 400,
|
||||||
MaxMutatingRequestsInFlight: 200,
|
MaxMutatingRequestsInFlight: 200,
|
||||||
MinRequestTimeout: 1800,
|
MinRequestTimeout: 1800,
|
||||||
|
EnableAPIResponseCompression: utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression),
|
||||||
|
|
||||||
// Default to treating watch as a long-running operation
|
// Default to treating watch as a long-running operation
|
||||||
// Generic API servers have no inherent long-running subresources
|
// Generic API servers have no inherent long-running subresources
|
||||||
@ -412,6 +417,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
healthzChecks: c.HealthzChecks,
|
healthzChecks: c.HealthzChecks,
|
||||||
|
|
||||||
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer, c.RequestContextMapper),
|
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer, c.RequestContextMapper),
|
||||||
|
|
||||||
|
enableAPIResponseCompression: c.EnableAPIResponseCompression,
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range delegationTarget.PostStartHooks() {
|
for k, v := range delegationTarget.PostStartHooks() {
|
||||||
|
@ -11,6 +11,7 @@ load(
|
|||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = [
|
srcs = [
|
||||||
|
"compression_test.go",
|
||||||
"cors_test.go",
|
"cors_test.go",
|
||||||
"maxinflight_test.go",
|
"maxinflight_test.go",
|
||||||
"timeout_test.go",
|
"timeout_test.go",
|
||||||
@ -31,6 +32,7 @@ go_test(
|
|||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
|
"compression.go",
|
||||||
"cors.go",
|
"cors.go",
|
||||||
"doc.go",
|
"doc.go",
|
||||||
"longrunning.go",
|
"longrunning.go",
|
||||||
@ -40,6 +42,7 @@ go_library(
|
|||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
|
"//vendor/github.com/emicklei/go-restful:go_default_library",
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
|
183
staging/src/k8s.io/apiserver/pkg/server/filters/compression.go
Normal file
183
staging/src/k8s.io/apiserver/pkg/server/filters/compression.go
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package filters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"compress/gzip"
|
||||||
|
"compress/zlib"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/emicklei/go-restful"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Compressor is an interface to compression writers
|
||||||
|
type Compressor interface {
|
||||||
|
io.WriteCloser
|
||||||
|
Flush() error
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
headerAcceptEncoding = "Accept-Encoding"
|
||||||
|
headerContentEncoding = "Content-Encoding"
|
||||||
|
|
||||||
|
encodingGzip = "gzip"
|
||||||
|
encodingDeflate = "deflate"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WithCompression wraps an http.Handler with the Compression Handler
|
||||||
|
func WithCompression(handler http.Handler, ctxMapper request.RequestContextMapper) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
wantsCompression, encoding := wantsCompressedResponse(req, ctxMapper)
|
||||||
|
w.Header().Set("Vary", "Accept-Encoding")
|
||||||
|
if wantsCompression {
|
||||||
|
compressionWriter, err := NewCompressionResponseWriter(w, encoding)
|
||||||
|
if err != nil {
|
||||||
|
handleError(w, req, err)
|
||||||
|
runtime.HandleError(fmt.Errorf("failed to compress HTTP response: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
compressionWriter.Header().Set("Content-Encoding", encoding)
|
||||||
|
handler.ServeHTTP(compressionWriter, req)
|
||||||
|
compressionWriter.(*compressionResponseWriter).Close()
|
||||||
|
} else {
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// wantsCompressedResponse reads the Accept-Encoding header to see if and which encoding is requested.
|
||||||
|
func wantsCompressedResponse(req *http.Request, ctxMapper request.RequestContextMapper) (bool, string) {
|
||||||
|
// don't compress watches
|
||||||
|
ctx, ok := ctxMapper.Get(req)
|
||||||
|
if !ok {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
info, ok := request.RequestInfoFrom(ctx)
|
||||||
|
if !ok {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
if !info.IsResourceRequest {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
if info.Verb == "watch" {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
header := req.Header.Get(headerAcceptEncoding)
|
||||||
|
gi := strings.Index(header, encodingGzip)
|
||||||
|
zi := strings.Index(header, encodingDeflate)
|
||||||
|
// use in order of appearance
|
||||||
|
switch {
|
||||||
|
case gi == -1:
|
||||||
|
return zi != -1, encodingDeflate
|
||||||
|
case zi == -1:
|
||||||
|
return gi != -1, encodingGzip
|
||||||
|
case gi < zi:
|
||||||
|
return true, encodingGzip
|
||||||
|
default:
|
||||||
|
return true, encodingDeflate
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type compressionResponseWriter struct {
|
||||||
|
writer http.ResponseWriter
|
||||||
|
compressor Compressor
|
||||||
|
encoding string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCompressionResponseWriter returns wraps w with a compression ResponseWriter, using the given encoding
|
||||||
|
func NewCompressionResponseWriter(w http.ResponseWriter, encoding string) (http.ResponseWriter, error) {
|
||||||
|
var compressor Compressor
|
||||||
|
switch encoding {
|
||||||
|
case encodingGzip:
|
||||||
|
compressor = gzip.NewWriter(w)
|
||||||
|
case encodingDeflate:
|
||||||
|
compressor = zlib.NewWriter(w)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("%s is not a supported encoding type", encoding)
|
||||||
|
}
|
||||||
|
return &compressionResponseWriter{
|
||||||
|
writer: w,
|
||||||
|
compressor: compressor,
|
||||||
|
encoding: encoding,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// compressionResponseWriter implements http.Responsewriter Interface
|
||||||
|
var _ http.ResponseWriter = &compressionResponseWriter{}
|
||||||
|
|
||||||
|
func (c *compressionResponseWriter) Header() http.Header {
|
||||||
|
return c.writer.Header()
|
||||||
|
}
|
||||||
|
|
||||||
|
// compress data according to compression method
|
||||||
|
func (c *compressionResponseWriter) Write(p []byte) (int, error) {
|
||||||
|
if c.compressorClosed() {
|
||||||
|
return -1, errors.New("compressing error: tried to write data using closed compressor")
|
||||||
|
}
|
||||||
|
c.Header().Set(headerContentEncoding, c.encoding)
|
||||||
|
return c.compressor.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *compressionResponseWriter) WriteHeader(status int) {
|
||||||
|
c.writer.WriteHeader(status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseNotify is part of http.CloseNotifier interface
|
||||||
|
func (c *compressionResponseWriter) CloseNotify() <-chan bool {
|
||||||
|
return c.writer.(http.CloseNotifier).CloseNotify()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the underlying compressor
|
||||||
|
func (c *compressionResponseWriter) Close() error {
|
||||||
|
if c.compressorClosed() {
|
||||||
|
return errors.New("Compressing error: tried to close already closed compressor")
|
||||||
|
}
|
||||||
|
|
||||||
|
c.compressor.Close()
|
||||||
|
c.compressor = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *compressionResponseWriter) Flush() {
|
||||||
|
if c.compressorClosed() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.compressor.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *compressionResponseWriter) compressorClosed() bool {
|
||||||
|
return nil == c.compressor
|
||||||
|
}
|
||||||
|
|
||||||
|
// RestfulWithCompression wraps WithCompression to be compatible with go-restful
|
||||||
|
func RestfulWithCompression(function restful.RouteFunction, ctxMapper request.RequestContextMapper) restful.RouteFunction {
|
||||||
|
return restful.RouteFunction(func(request *restful.Request, response *restful.Response) {
|
||||||
|
handler := WithCompression(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
response.ResponseWriter = w
|
||||||
|
request.Request = req
|
||||||
|
function(request, response)
|
||||||
|
}), ctxMapper)
|
||||||
|
handler.ServeHTTP(response.ResponseWriter, request.Request)
|
||||||
|
})
|
||||||
|
}
|
@ -0,0 +1,110 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package filters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/filters"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCompression(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
encoding string
|
||||||
|
watch bool
|
||||||
|
}{
|
||||||
|
{"", false},
|
||||||
|
{"gzip", true},
|
||||||
|
{"gzip", false},
|
||||||
|
}
|
||||||
|
|
||||||
|
responseData := []byte("1234")
|
||||||
|
|
||||||
|
requestContextMapper := request.NewRequestContextMapper()
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
handler := WithCompression(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
w.Write(responseData)
|
||||||
|
}),
|
||||||
|
requestContextMapper,
|
||||||
|
)
|
||||||
|
handler = filters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
|
||||||
|
handler = request.WithRequestContext(handler, requestContextMapper)
|
||||||
|
server := httptest.NewServer(handler)
|
||||||
|
defer server.Close()
|
||||||
|
client := http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DisableCompression: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
url := server.URL + "/api/v1/pods"
|
||||||
|
if test.watch {
|
||||||
|
url = url + "?watch=1"
|
||||||
|
}
|
||||||
|
request, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
request.Header.Set("Accept-Encoding", test.encoding)
|
||||||
|
response, err := client.Do(request)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
var reader io.Reader
|
||||||
|
if test.encoding == "gzip" && !test.watch {
|
||||||
|
if response.Header.Get("Content-Encoding") != "gzip" {
|
||||||
|
t.Fatal("expected response header Content-Encoding to be set to \"gzip\"")
|
||||||
|
}
|
||||||
|
if response.Header.Get("Vary") != "Accept-Encoding" {
|
||||||
|
t.Fatal("expected response header Vary to be set to \"Accept-Encoding\"")
|
||||||
|
}
|
||||||
|
reader, err = gzip.NewReader(response.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if response.Header.Get("Content-Encoding") == "gzip" {
|
||||||
|
t.Fatal("expected response header Content-Encoding not to be set")
|
||||||
|
}
|
||||||
|
reader = response.Body
|
||||||
|
}
|
||||||
|
body, err := ioutil.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(body, responseData) {
|
||||||
|
t.Fatalf("Expected response body %s to equal %s", body, responseData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestRequestInfoResolver() *request.RequestInfoFactory {
|
||||||
|
return &request.RequestInfoFactory{
|
||||||
|
APIPrefixes: sets.NewString("api", "apis"),
|
||||||
|
GrouplessAPIPrefixes: sets.NewString("api"),
|
||||||
|
}
|
||||||
|
}
|
@ -150,6 +150,10 @@ type GenericAPIServer struct {
|
|||||||
|
|
||||||
// auditing. The backend is started after the server starts listening.
|
// auditing. The backend is started after the server starts listening.
|
||||||
AuditBackend audit.Backend
|
AuditBackend audit.Backend
|
||||||
|
|
||||||
|
// enableAPIResponseCompression indicates whether API Responses should support compression
|
||||||
|
// if the client requests it via Accept-Encoding
|
||||||
|
enableAPIResponseCompression bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
||||||
@ -431,9 +435,10 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
|
|||||||
Linker: apiGroupInfo.GroupMeta.SelfLinker,
|
Linker: apiGroupInfo.GroupMeta.SelfLinker,
|
||||||
Mapper: apiGroupInfo.GroupMeta.RESTMapper,
|
Mapper: apiGroupInfo.GroupMeta.RESTMapper,
|
||||||
|
|
||||||
Admit: s.admissionControl,
|
Admit: s.admissionControl,
|
||||||
Context: s.RequestContextMapper(),
|
Context: s.RequestContextMapper(),
|
||||||
MinRequestTimeout: s.minRequestTimeout,
|
MinRequestTimeout: s.minRequestTimeout,
|
||||||
|
EnableAPIResponseCompression: s.enableAPIResponseCompression,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user