kubelet: podresources: DOS prevention with builtin ratelimit

Implement DOS prevention wiring a global rate limit for podresources
API. The goal here is not to introduce a general ratelimiting solution
for the kubelet (we need more research and discussion to get there),
but rather to prevent misuse of the API.

Known limitations:
- the rate limits value (QPS, BurstTokens) are hardcoded to
  "high enough" values.
  Enabling user-configuration would require more discussion
  and sweeping changes to the other kubelet endpoints, so it
  is postponed for now.
- the rate limiting is global. Malicious clients can starve other
  clients consuming the QPS quota.

Add e2e test to exercise the flow, because the wiring itself
is mostly boilerplate and API adaptation.
This commit is contained in:
Francesco Romani 2023-02-21 08:48:57 +01:00
parent 09517c27c4
commit b837a0c1ff
3 changed files with 111 additions and 1 deletions

View File

@ -0,0 +1,67 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpc
import (
"context"
gotimerate "golang.org/x/time/rate"
"k8s.io/klog/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// DefaultQPS is determined by empirically reviewing known consumers of the API.
// It's at least unlikely that there is a legitimate need to query podresources
// more than 100 times per second, the other subsystems are not guaranteed to react
// so fast in the first place.
DefaultQPS = 100
// DefaultBurstTokens is determined by empirically reviewing known consumers of the API.
// See the documentation of DefaultQPS, same caveats apply.
DefaultBurstTokens = 10
)
var (
ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit")
)
// Limiter defines the interface to perform request rate limiting,
// based on the interface exposed by https://pkg.go.dev/golang.org/x/time/rate#Limiter
type Limiter interface {
// Allow reports whether an event may happen now.
Allow() bool
}
// LimiterUnaryServerInterceptor returns a new unary server interceptors that performs request rate limiting.
func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !limiter.Allow() {
return nil, ErrorLimitExceeded
}
return handler(ctx, req)
}
}
func WithRateLimiter(qps, burstTokens int32) grpc.ServerOption {
qpsVal := gotimerate.Limit(qps)
burstVal := int(burstTokens)
klog.InfoS("Setting rate limiting for podresources endpoint", "qps", qpsVal, "burstTokens", burstVal)
return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal)))
}

View File

@ -75,6 +75,7 @@ import (
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming" "k8s.io/kubernetes/pkg/kubelet/cri/streaming"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward"
@ -218,7 +219,7 @@ type PodResourcesProviders struct {
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service // ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(socket string, providers PodResourcesProviders) { func ListenAndServePodResources(socket string, providers PodResourcesProviders) {
server := grpc.NewServer() server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens))
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers.Pods, providers.Devices)) podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers.Pods, providers.Devices))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers.Pods, providers.Devices, providers.Cpus, providers.Memory)) podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers.Pods, providers.Devices, providers.Cpus, providers.Memory))

View File

@ -18,6 +18,7 @@ package e2enode
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"os" "os"
"strings" "strings"
@ -31,6 +32,7 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
@ -861,7 +863,47 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
ginkgo.By("Ensuring the metrics match the expectations a few more times") ginkgo.By("Ensuring the metrics match the expectations a few more times")
gomega.Consistently(ctx, getPodResourcesMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) gomega.Consistently(ctx, getPodResourcesMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
}) })
})
ginkgo.Context("with the builtin rate limit values", func() {
ginkgo.It("should hit throttling when calling podresources List in a tight loop", func(ctx context.Context) {
// ensure APIs have been called at least once
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
ginkgo.By("Connecting to the kubelet endpoint")
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens
errs := []error{}
ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries))
startTime := time.Now()
for try := 0; try < tries; try++ {
_, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
errs = append(errs, err)
}
elapsed := time.Since(startTime)
ginkgo.By(fmt.Sprintf("Checking return codes for %d List() calls in %v", tries, elapsed))
framework.ExpectNoError(errs[0], "the first List() call unexpectedly failed with %v", errs[0])
// we would expect (burst) successes and then (tries-burst) errors on a clean test environment running with
// enough CPU power. CI is usually harsher. So we relax constraints, expecting at least _a_ failure, while
// we are likely to get much more. But we can't predict yet how more we should expect, so we prefer to relax
// constraints than to risk flakes at this stage.
errLimitExceededCount := 0
for _, err := range errs[1:] {
if errors.Is(err, podresourcesgrpc.ErrorLimitExceeded) {
errLimitExceededCount++
}
}
gomega.Expect(errLimitExceededCount).ToNot(gomega.BeZero(), "never hit the rate limit trying %d calls in %v", tries, elapsed)
framework.Logf("got %d/%d rate limit errors, at least one needed, the more the better", errLimitExceededCount, tries)
})
}) })
}) })