diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/disable_compression.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/disable_compression.go new file mode 100644 index 00000000000..08c17a89e22 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/disable_compression.go @@ -0,0 +1,43 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "fmt" + "net/http" + + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/request" +) + +// CompressionDisabledFunc checks if a given request should disable compression. +type CompressionDisabledFunc func(*http.Request) (bool, error) + +// WithCompressionDisabled stores result of CompressionDisabledFunc in context. +func WithCompressionDisabled(handler http.Handler, predicate CompressionDisabledFunc) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + decision, err := predicate(req) + if err != nil { + responsewriters.InternalError(w, req, fmt.Errorf("failed to determine if request should disable compression: %v", err)) + return + } + + req = req.WithContext(request.WithCompressionDisabled(ctx, decision)) + handler.ServeHTTP(w, req) + }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/disable_compression_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/disable_compression_test.go new file mode 100644 index 00000000000..3422d95aaed --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/disable_compression_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "errors" + "net/http" + "net/http/httptest" + "testing" + + "k8s.io/apiserver/pkg/endpoints/request" +) + +func TestWithCompressionDisabled(t *testing.T) { + tests := []struct { + name string + checkDecision bool + checkErr error + want bool + wantHandlerCalled bool + }{ + { + name: "decision=true", + checkDecision: true, + want: true, + wantHandlerCalled: true, + }, + { + name: "decision=false", + checkDecision: false, + want: false, + wantHandlerCalled: true, + }, + { + name: "check error", + checkErr: errors.New("check failed"), + want: false, + wantHandlerCalled: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handlerCalled := false + handler := http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) { + handlerCalled = true + if got, want := request.CompressionDisabledFrom(req.Context()), tt.checkDecision; got != want { + t.Errorf("request.CompressionDisabledFrom(req.Context())=%v; want=%v", got, want) + } + }) + fake := func(*http.Request) (bool, error) { + return tt.checkDecision, tt.checkErr + } + wrapped := WithCompressionDisabled(handler, fake) + testRequest, err := http.NewRequest(http.MethodGet, "/path", nil) + if err != nil { + t.Fatal(err) + } + w := httptest.NewRecorder() + wrapped.ServeHTTP(w, testRequest) + if handlerCalled != tt.wantHandlerCalled { + t.Errorf("expected handlerCalled=%v, got=%v", handlerCalled, tt.wantHandlerCalled) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index acb08800e11..e203d5514f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -87,19 +87,21 @@ func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSe // The context is optional and can be nil. This method will perform optional content compression if requested by // a client and the feature gate for APIResponseCompression is enabled. func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { + disableCompression := request.CompressionDisabledFrom(req.Context()) trace := utiltrace.New("SerializeObject", utiltrace.Field{"audit-id", request.GetAuditIDTruncated(req.Context())}, utiltrace.Field{"method", req.Method}, utiltrace.Field{"url", req.URL.Path}, utiltrace.Field{"protocol", req.Proto}, utiltrace.Field{"mediaType", mediaType}, - utiltrace.Field{"encoder", encoder.Identifier()}) + utiltrace.Field{"encoder", encoder.Identifier()}, + utiltrace.Field{"disableCompression", disableCompression}) defer trace.LogIfLong(5 * time.Second) w := &deferredResponseWriter{ mediaType: mediaType, statusCode: statusCode, - contentEncoding: negotiateContentEncoding(req), + contentEncoding: negotiateContentEncoding(req, disableCompression), hw: hw, trace: trace, } @@ -155,12 +157,12 @@ const ( // negotiateContentEncoding returns a supported client-requested content encoding for the // provided request. It will return the empty string if no supported content encoding was // found or if response compression is disabled. -func negotiateContentEncoding(req *http.Request) string { +func negotiateContentEncoding(req *http.Request, disableCompression bool) string { encoding := req.Header.Get("Accept-Encoding") if len(encoding) == 0 { return "" } - if !utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression) { + if !utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression) || disableCompression { return "" } for len(encoding) > 0 { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index febb88a7da2..044723c3829 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -19,6 +19,7 @@ package responsewriters import ( "bytes" "compress/gzip" + "context" "encoding/hex" "encoding/json" "errors" @@ -41,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -231,7 +233,7 @@ func TestSerializeObject(t *testing.T) { }, { - name: "compress on gzip", + name: "compress on gzip, request's context has no decision", compressionEnabled: true, out: largePayload, mediaType: "application/json", @@ -249,6 +251,40 @@ func TestSerializeObject(t *testing.T) { }, wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel), }, + { + name: "compress on gzip, request's context allows compression", + compressionEnabled: true, + out: largePayload, + mediaType: "application/json", + req: (&http.Request{ + Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }, + URL: &url.URL{Path: "/path"}, + }).WithContext(request.WithCompressionDisabled(context.Background(), false)), + wantCode: http.StatusOK, + wantHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel), + }, + { + name: "compress on gzip, request's context disables compression", + compressionEnabled: true, + out: largePayload, + mediaType: "application/json", + req: (&http.Request{ + Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }, + URL: &url.URL{Path: "/path"}, + }).WithContext(request.WithCompressionDisabled(context.Background(), true)), + wantCode: http.StatusOK, + wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, + wantBody: largePayload, + }, { name: "compression is not performed on small objects", diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/disable_compression.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/disable_compression.go new file mode 100644 index 00000000000..fa1713d4704 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/disable_compression.go @@ -0,0 +1,37 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "context" +) + +type disableCompressionIDKeyType int + +const disableCompressionIDKey disableCompressionIDKeyType = iota + +// WithCompressionDisabled stores bool in context. +func WithCompressionDisabled(parent context.Context, disableCompression bool) context.Context { + return WithValue(parent, disableCompressionIDKey, disableCompression) +} + +// CompressionDisabledFrom retrieves bool from context. +// Defaults to false if not set. +func CompressionDisabledFrom(ctx context.Context) bool { + decision, _ := ctx.Value(disableCompressionIDKey).(bool) + return decision +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/disable_compression_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/disable_compression_test.go new file mode 100644 index 00000000000..eadd622ba37 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/disable_compression_test.go @@ -0,0 +1,43 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "context" + "testing" +) + +func TestCompressionDisabled(t *testing.T) { + ctx := context.Background() + + // Default value is false. + if got, want := CompressionDisabledFrom(ctx), false; got != want { + t.Errorf("CompressionDisabledFrom(ctx) = %v; want = %v", got, want) + } + + // We retrieve stored true. + ctx = WithCompressionDisabled(ctx, true) + if got, want := CompressionDisabledFrom(ctx), true; got != want { + t.Errorf("CompressionDisabledFrom(ctx) = %v; want = %v", got, want) + } + + // We retrieve stored false. + ctx = WithCompressionDisabled(ctx, false) + if got, want := CompressionDisabledFrom(ctx), false; got != want { + t.Errorf("CompressionDisabledFrom(ctx) = %v; want = %v", got, want) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index ddff43e5370..95887fccaa8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -257,6 +257,9 @@ type Config struct { // StorageVersionManager holds the storage versions of the API resources installed by this server. StorageVersionManager storageversion.Manager + + // CompressionDisabledFunc returns whether compression should be disabled for a given request. + CompressionDisabledFunc genericapifilters.CompressionDisabledFunc } type RecommendedConfig struct { @@ -855,6 +858,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { if c.ShutdownSendRetryAfter { handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled()) } + if c.CompressionDisabledFunc != nil { + handler = genericapifilters.WithCompressionDisabled(handler, c.CompressionDisabledFunc) + } handler = genericfilters.WithHTTPLogging(handler) if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { handler = genericapifilters.WithTracing(handler, c.TracerProvider) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index c100d062019..aa388006222 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -26,7 +26,9 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/util/disablecompression" utilfeature "k8s.io/apiserver/pkg/util/feature" + netutils "k8s.io/utils/net" "github.com/spf13/pflag" ) @@ -63,21 +65,27 @@ type ServerRunOptions struct { // If enabled, after ShutdownDelayDuration elapses, any incoming request is // rejected with a 429 status code and a 'Retry-After' response. ShutdownSendRetryAfter bool + + // DisableCompressionForClientIPs is a comma separated list of CIDR IP ranges + // (parsable by net.ParseCIDR, as defined in RFC 4632 and RFC 4291) for which + // traffic compression should be disabled. + DisableCompressionForClientIPs []string } func NewServerRunOptions() *ServerRunOptions { defaults := server.NewConfig(serializer.CodecFactory{}) return &ServerRunOptions{ - MaxRequestsInFlight: defaults.MaxRequestsInFlight, - MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, - RequestTimeout: defaults.RequestTimeout, - LivezGracePeriod: defaults.LivezGracePeriod, - MinRequestTimeout: defaults.MinRequestTimeout, - ShutdownDelayDuration: defaults.ShutdownDelayDuration, - JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, - MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, - EnablePriorityAndFairness: true, - ShutdownSendRetryAfter: false, + MaxRequestsInFlight: defaults.MaxRequestsInFlight, + MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, + RequestTimeout: defaults.RequestTimeout, + LivezGracePeriod: defaults.LivezGracePeriod, + MinRequestTimeout: defaults.MinRequestTimeout, + ShutdownDelayDuration: defaults.ShutdownDelayDuration, + JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, + MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, + EnablePriorityAndFairness: true, + ShutdownSendRetryAfter: false, + DisableCompressionForClientIPs: nil, } } @@ -97,6 +105,13 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.MaxRequestBodyBytes = s.MaxRequestBodyBytes c.PublicAddress = s.AdvertiseAddress c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter + if len(s.DisableCompressionForClientIPs) != 0 { + pred, err := disablecompression.NewClientIPPredicate(s.DisableCompressionForClientIPs) + if err != nil { + return err + } + c.CompressionDisabledFunc = pred.Predicate + } return nil } @@ -161,6 +176,10 @@ func (s *ServerRunOptions) Validate() []error { if err := validateHSTSDirectives(s.HSTSDirectives); err != nil { errors = append(errors, err) } + + if _, err := netutils.ParseCIDRs(s.DisableCompressionForClientIPs); err != nil { + errors = append(errors, err) + } return errors } @@ -256,5 +275,8 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+ "in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.") + fs.StringSliceVar(&s.DisableCompressionForClientIPs, "disable-compression-for-client-ips", s.DisableCompressionForClientIPs, ""+ + "A comma separated list of client IP ranges in CIDR notation like \"192.0.2.0/24\" or \"2001:db8::/32\", as defined in RFC 4632 and RFC 4291, for which traffic compression will be disabled.") + utilfeature.DefaultMutableFeatureGate.AddFlag(fs) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/disablecompression/disablecompression.go b/staging/src/k8s.io/apiserver/pkg/util/disablecompression/disablecompression.go new file mode 100644 index 00000000000..8d4693479c0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/disablecompression/disablecompression.go @@ -0,0 +1,57 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disablecompression + +import ( + "fmt" + "net" + "net/http" + + utilnet "k8s.io/apimachinery/pkg/util/net" + netutils "k8s.io/utils/net" +) + +// ClientIPPredicate.Predicate implements CompressionDisabledFunc interface that decides +// based on client IP. +type ClientIPPredicate struct { + cidrs []*net.IPNet +} + +// NewClientIPPredicate creates a new ClientIPPredicate instance. +func NewClientIPPredicate(cidrStrings []string) (*ClientIPPredicate, error) { + cidrs, err := netutils.ParseCIDRs(cidrStrings) + if err != nil { + return nil, fmt.Errorf("failed to parse cidrs: %v", err) + } + return &ClientIPPredicate{cidrs: cidrs}, nil +} + +// Predicate checks if ClientIP matches any cidr. +func (c *ClientIPPredicate) Predicate(req *http.Request) (bool, error) { + ip := utilnet.GetClientIP(req) + if ip == nil { + return false, fmt.Errorf("unable to determine source IP for %v", req) + } + + for _, cidr := range c.cidrs { + if cidr.Contains(ip) { + return true, nil + } + } + + return false, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/disablecompression/disablecompression_test.go b/staging/src/k8s.io/apiserver/pkg/util/disablecompression/disablecompression_test.go new file mode 100644 index 00000000000..cfe5fe822a6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/disablecompression/disablecompression_test.go @@ -0,0 +1,118 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disablecompression + +import ( + "net/http" + "testing" +) + +func TestNewClientIPPredicate(t *testing.T) { + tests := []struct { + name string + cidrStrings []string + wantErr bool + }{ + { + name: "rfc1918", + cidrStrings: []string{"10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"}, + }, + { + name: "rfc4193 (ipv6)", + cidrStrings: []string{"fc00::/7"}, + }, + { + name: "ipv6 loopback", + cidrStrings: []string{"::1/128"}, + }, + { + name: "bad cidr", + cidrStrings: []string{"not a cidr string"}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewClientIPPredicate(tt.cidrStrings) + if (err != nil) != tt.wantErr { + t.Fatalf("NewClientIPPredicate() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.wantErr { + return + } + if got, want := len(got.cidrs), len(tt.cidrStrings); got != want { + t.Errorf("len(NewClientIPPredicate.cidrs()) = %v, want %v", got, want) + } + }) + } +} + +func TestClientIPPredicate_Predicate(t *testing.T) { + check, err := NewClientIPPredicate([]string{"::1/128", "10.0.0.0/8"}) + if err != nil { + t.Fatalf("failed to construct NewClientIPPredicate: %v", err) + } + tests := []struct { + name string + req *http.Request + want bool + wantErr bool + }{ + { + name: "ipv4, in range", + req: &http.Request{RemoteAddr: "10.0.0.1:123"}, + want: true, + }, + { + name: "ipv4, out of range", + req: &http.Request{RemoteAddr: "11.0.0.1:123"}, + want: false, + }, + { + name: "ipv6, in range", + req: &http.Request{RemoteAddr: "[::1]:123"}, + want: true, + }, + { + name: "ipv6, out of range", + req: &http.Request{RemoteAddr: "[::2]:123"}, + want: false, + }, + { + name: "no IP", + req: &http.Request{}, + wantErr: true, + }, + { + name: "RemoteAddr doesn't parse", + req: &http.Request{RemoteAddr: "this is definitely not an IP address and port"}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := check.Predicate(tt.req) + if (err != nil) != tt.wantErr { + t.Errorf("ClientIPPredicate.Predicate() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("ClientIPPredicate.Predicate() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 0454c98c16c..2f6a9f4dd55 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1654,6 +1654,7 @@ k8s.io/apiserver/pkg/storage/value/encrypt/secretbox k8s.io/apiserver/pkg/storageversion k8s.io/apiserver/pkg/tracing k8s.io/apiserver/pkg/util/apihelpers +k8s.io/apiserver/pkg/util/disablecompression k8s.io/apiserver/pkg/util/dryrun k8s.io/apiserver/pkg/util/feature k8s.io/apiserver/pkg/util/flowcontrol