From b837a0c1ffd90ee0d561e38e97e1ce1d13f39cf0 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 21 Feb 2023 08:48:57 +0100 Subject: [PATCH] kubelet: podresources: DOS prevention with builtin ratelimit Implement DOS prevention wiring a global rate limit for podresources API. The goal here is not to introduce a general ratelimiting solution for the kubelet (we need more research and discussion to get there), but rather to prevent misuse of the API. Known limitations: - the rate limits value (QPS, BurstTokens) are hardcoded to "high enough" values. Enabling user-configuration would require more discussion and sweeping changes to the other kubelet endpoints, so it is postponed for now. - the rate limiting is global. Malicious clients can starve other clients consuming the QPS quota. Add e2e test to exercise the flow, because the wiring itself is mostly boilerplate and API adaptation. --- .../apis/podresources/grpc/ratelimit.go | 67 +++++++++++++++++++ pkg/kubelet/server/server.go | 3 +- test/e2e_node/podresources_test.go | 42 ++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 pkg/kubelet/apis/podresources/grpc/ratelimit.go diff --git a/pkg/kubelet/apis/podresources/grpc/ratelimit.go b/pkg/kubelet/apis/podresources/grpc/ratelimit.go new file mode 100644 index 00000000000..b9214b13c16 --- /dev/null +++ b/pkg/kubelet/apis/podresources/grpc/ratelimit.go @@ -0,0 +1,67 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "context" + + gotimerate "golang.org/x/time/rate" + "k8s.io/klog/v2" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + // DefaultQPS is determined by empirically reviewing known consumers of the API. + // It's at least unlikely that there is a legitimate need to query podresources + // more than 100 times per second, the other subsystems are not guaranteed to react + // so fast in the first place. + DefaultQPS = 100 + // DefaultBurstTokens is determined by empirically reviewing known consumers of the API. + // See the documentation of DefaultQPS, same caveats apply. + DefaultBurstTokens = 10 +) + +var ( + ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit") +) + +// Limiter defines the interface to perform request rate limiting, +// based on the interface exposed by https://pkg.go.dev/golang.org/x/time/rate#Limiter +type Limiter interface { + // Allow reports whether an event may happen now. + Allow() bool +} + +// LimiterUnaryServerInterceptor returns a new unary server interceptors that performs request rate limiting. +func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if !limiter.Allow() { + return nil, ErrorLimitExceeded + } + return handler(ctx, req) + } +} + +func WithRateLimiter(qps, burstTokens int32) grpc.ServerOption { + qpsVal := gotimerate.Limit(qps) + burstVal := int(burstTokens) + klog.InfoS("Setting rate limiting for podresources endpoint", "qps", qpsVal, "burstTokens", burstVal) + return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal))) +} diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 606fd52a628..74c5dba9951 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -75,6 +75,7 @@ import ( "k8s.io/kubernetes/pkg/features" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" + podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/cri/streaming" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward" @@ -218,7 +219,7 @@ type PodResourcesProviders struct { // ListenAndServePodResources initializes a gRPC server to serve the PodResources service func ListenAndServePodResources(socket string, providers PodResourcesProviders) { - server := grpc.NewServer() + server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens)) podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers.Pods, providers.Devices)) podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers.Pods, providers.Devices, providers.Cpus, providers.Memory)) diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index 5cf2f834cec..086d99f6e94 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -18,6 +18,7 @@ package e2enode import ( "context" + "errors" "fmt" "os" "strings" @@ -31,6 +32,7 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" + podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/util" @@ -861,7 +863,47 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P ginkgo.By("Ensuring the metrics match the expectations a few more times") gomega.Consistently(ctx, getPodResourcesMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) }) + }) + ginkgo.Context("with the builtin rate limit values", func() { + ginkgo.It("should hit throttling when calling podresources List in a tight loop", func(ctx context.Context) { + // ensure APIs have been called at least once + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + ginkgo.By("Connecting to the kubelet endpoint") + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens + errs := []error{} + + ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries)) + startTime := time.Now() + for try := 0; try < tries; try++ { + _, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) + errs = append(errs, err) + } + elapsed := time.Since(startTime) + + ginkgo.By(fmt.Sprintf("Checking return codes for %d List() calls in %v", tries, elapsed)) + + framework.ExpectNoError(errs[0], "the first List() call unexpectedly failed with %v", errs[0]) + // we would expect (burst) successes and then (tries-burst) errors on a clean test environment running with + // enough CPU power. CI is usually harsher. So we relax constraints, expecting at least _a_ failure, while + // we are likely to get much more. But we can't predict yet how more we should expect, so we prefer to relax + // constraints than to risk flakes at this stage. + errLimitExceededCount := 0 + for _, err := range errs[1:] { + if errors.Is(err, podresourcesgrpc.ErrorLimitExceeded) { + errLimitExceededCount++ + } + } + gomega.Expect(errLimitExceededCount).ToNot(gomega.BeZero(), "never hit the rate limit trying %d calls in %v", tries, elapsed) + + framework.Logf("got %d/%d rate limit errors, at least one needed, the more the better", errLimitExceededCount, tries) + }) }) })